概述
目录
优先级反转概念
如何解决优先级反转
优先级继承代码实现分析
互斥量和信号量的区别
为什么中断中不能休眠?
优先级反转概念
优先级反转是指一个低优先级的任务持有一个被高优先级任务所需要的共享资源。高优先任务由于因资源缺乏而处于受阻状态,一直等到低优先级任务释放资源为止。而低优先级获得的CPU时间少,如果此时有优先级处于两者之间的任务,并且不需要那个共享资源,则该中优先级的任务反而超过这两个任务而获得CPU时间。如果高优先级等待资源时不是阻塞等待,而是忙循环,则可能永远无法获得资源,因为此时低优先级进程无法与高优先级进程争夺CPU时间,从而无法执行,进而无法释放资源,造成的后果就是高优先级任务无法获得资源而继续推进。
假设三个任务准备执行,A,B,C,优先级依次是A>B>C;
首先:C处于运行状态,获得CPU正在执行,同时占有了某种资源;
其次:A进入就绪状态,因为优先级比C高,所以获得CPU,A转为运行状态;C进入就绪状态;
第三:执行过程中需要使用资源,而这个资源又被等待中的C占有的,于是A进入阻塞状态,C回到运行状态;
第四:此时B进入就绪状态,因为优先级比C高,B获得CPU,进入运行状态;C又回到就绪状态;
第五:如果这时又出现B2,B3等任务,他们的优先级比C高,但比A低,那么就会出现高优先级任务的A不能执行,反而低优先级的B,B2,B3等任务可以执行的奇怪现象,而这就是优先反转。
如何解决优先级反转
高优先级任务A不能执行的原因是C霸占了资源,而C如果不能获得CPU,不释放资源,那A也只好一直等在那,所以解决优先级反转的原则肯定就是让C尽快执行,尽早把资源释放了。基于这个原则产生了两个方法:
优先级继承
当发现高优先级的任务因为低优先级任务占用资源而阻塞时,就将低优先级任务的优先级提升到等待它所占有的资源的最高优先级任务的优先级。
优先级天花板
优先级天花板是指将申请某资源的任务的优先级提升到可能访问该资源的所有任务中最高优先级任务的优先级.(这个优先级称为该资源的优先级天花板)
两者的区别
优先级继承:只有一个任务访问资源时一切照旧,没有区别,只有当高优先级任务因为资源被低优先级占有而被阻塞时,才会提高占有资源任务的优先级;而优先级天花板,不论是否发生阻塞,都提升,即谁先拿到资源,就将这个任务提升到该资源的天花板优先级。
优先级继承代码实现分析
优先级反转是所有操作系统都会面临的一个问题, 相比于Linux,RT-Thread内核更加小巧,通过分析RT-Thread内核可以更加快速的理解一些概念原理。
既然是优先级反转,就势必会涉及到竞争共享资源加锁阻塞的操作,另外, 还要确保RTOS操作系统的实时性,即确保高优先级线程尽可能快的被执行,也就意味着当所有者释放锁之后,应当确保阻塞的高优先级线程被立即调度。接下来我们以RT-Thread 获取互斥锁的加锁和解锁接口rt_mutex_take rt_mutex_release来分析优先级继承的实现。
/**
* This function will take a mutex, if the mutex is unavailable, the
* thread shall wait for a specified time.
*
* @param mutex the mutex object
* @param time the waiting time
*
* @return the error code
*/
rt_err_t rt_mutex_take(rt_mutex_t mutex, rt_int32_t time)
{
register rt_base_t temp;
struct rt_thread *thread;
/* this function must not be used in interrupt even if time = 0 */
RT_DEBUG_IN_THREAD_CONTEXT;
RT_ASSERT(mutex != RT_NULL);
/* disable interrupt */
temp = rt_hw_interrupt_disable();// 关中断
/* get current thread */
thread = rt_thread_self();//获取当前线程控制块
RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(mutex->parent.parent)));
RT_DEBUG_LOG(RT_DEBUG_IPC,
("mutex_take: current thread %s, mutex value: %d, hold: %dn",
thread->name, mutex->value, mutex->hold));
/* reset thread error */
thread->error = RT_EOK;
if (mutex->owner == thread)// 判断锁的主人是否是当前线程 ,如果是当前线程,则意味着重复加锁, RT-Thread是允许重复加锁的, 只要用户确保加锁和解锁操作是配对的即可,这个和rt_hw_interrupt_disable类似,可重复调用,用户需要确保成对出现即可。
{
/* it's the same thread */
mutex->hold ++;
}
else
{
/* The value of mutex is 1 in initial status. Therefore, if the
* value is great than 0, it indicates the mutex is avaible.
*/
if (mutex->value > 0)//互斥锁可用
{
/* mutex is available */
mutex->value --;
/* set mutex owner and original priority */
mutex->owner = thread;
mutex->original_priority = thread->current_priority;
mutex->hold ++;
}
else
{//互斥锁不可用, 也只有当获取不到锁的时候,才有可能导致优先级反转
/* no waiting, return with timeout */
if (time == 0)
{//获取不到锁,立即返回
/* set error as timeout */
thread->error = -RT_ETIMEOUT;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return -RT_ETIMEOUT;
}
else
{//锁不可用, 进入阻塞逻辑
/* mutex is unavailable, push to suspend list */
RT_DEBUG_LOG(RT_DEBUG_IPC, ("mutex_take: suspend thread: %sn",
thread->name));
/* change the owner thread priority of mutex */
//接下来这行代码就是优先级继承的代码实现,
if (thread->current_priority < mutex->owner->current_priority)
{//判断当前线程的优先级和锁拥有者线程的优先级,如果当前线程优先级高,则有可能发生优先级反转,内核就需要采用优先级继承的方式来解决此问题。
/* change the owner thread priority */
//通过rt_thread_control将锁拥有者线程的优先级提升到与当前线程同样的优先级
rt_thread_control(mutex->owner,
RT_THREAD_CTRL_CHANGE_PRIORITY,
&thread->current_priority);
}
/* suspend current thread */
rt_ipc_list_suspend(&(mutex->parent.suspend_thread),
thread,
mutex->parent.parent.flag);
/* has waiting time, start thread timer */
if (time > 0)
{//如果允许的阻塞时间有效,则需要启动定时器.
RT_DEBUG_LOG(RT_DEBUG_IPC,
("mutex_take: start the timer of thread:%sn",
thread->name));
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&time);
rt_timer_start(&(thread->thread_timer));
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* do schedule */
//执行调度, 锁拥有者线程将会被调度执行。
rt_schedule();
if (thread->error != RT_EOK)
{
/* return error */
return thread->error;
}
else
{
/* the mutex is taken successfully. */
/* disable interrupt */
temp = rt_hw_interrupt_disable();
}
}
}
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(mutex->parent.parent)));
return RT_EOK;
}
rt_mutex_take实现了优先级继承的相关代码,会在适当的时候,对锁拥有者的线程优先级进行提升, rt_mutex_release则需要确保锁被释放后,立即调度被阻塞的高优先级线程, 这样才能确保高优先级线程被挂起的时间尽可能短。
/**
* This function will release a mutex, if there are threads suspended on mutex,
* it will be waked up.
*
* @param mutex the mutex object
*
* @return the error code
*/
rt_err_t rt_mutex_release(rt_mutex_t mutex)
{
register rt_base_t temp;
struct rt_thread *thread;
rt_bool_t need_schedule;
need_schedule = RT_FALSE;
/* only thread could release mutex because we need test the ownership */
RT_DEBUG_IN_THREAD_CONTEXT;
/* get current thread */
thread = rt_thread_self();//获取当前线程控制块
/* disable interrupt */
temp = rt_hw_interrupt_disable();//关中断
RT_DEBUG_LOG(RT_DEBUG_IPC,
("mutex_release:current thread %s, mutex value: %d, hold: %dn",
thread->name, mutex->value, mutex->hold));
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mutex->parent.parent)));
/* mutex only can be released by owner */
if (thread != mutex->owner)//错误操作, 一个锁只能被加锁者释放
{
thread->error = -RT_ERROR;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return -RT_ERROR;
}
/* decrease hold */
mutex->hold --;
/* if no hold */
if (mutex->hold == 0)
{//计数器为0,则锁被彻底释放,即不再有线程占用该锁。
/* change the owner thread to original priority */
if (mutex->original_priority != mutex->owner->current_priority)
{//判断锁的线程原始优先级和当前优先级是否一致,如果不一致,则意味着发生了优先级继承,需要在这里恢复。
rt_thread_control(mutex->owner,
RT_THREAD_CTRL_CHANGE_PRIORITY,
&(mutex->original_priority));
}
/* wakeup suspended thread */
if (!rt_list_isempty(&mutex->parent.suspend_thread))
{//如果有被挂起到该锁的线程,则设置标志位need_schedule
,重新调度。这里有一个问题: 如果当前线程为最高优先级,那么当锁释放后需要继续运行,这时是不能重新调度的,如果当前线程为低优先级,且阻塞了高优先级线程,那么锁释放后就需要立即进行调度, RT-Thread将以上判断逻辑放在了函数rt_schedule中实现。
/* get suspended thread */
thread = rt_list_entry(mutex->parent.suspend_thread.next,
struct rt_thread,
tlist);
RT_DEBUG_LOG(RT_DEBUG_IPC, ("mutex_release: resume thread: %sn",
thread->name));
/* set new owner and priority */
mutex->owner = thread;
mutex->original_priority = thread->current_priority;
mutex->hold ++;
/* resume thread */
rt_ipc_list_resume(&(mutex->parent.suspend_thread));
need_schedule = RT_TRUE;
}
else
{
/* increase value */
mutex->value ++;
/* clear owner */
mutex->owner = RT_NULL;
mutex->original_priority = 0xff;
}
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* perform a schedule */
if (need_schedule == RT_TRUE)
rt_schedule();
return RT_EOK;
}
/**
* This function will perform one schedule. It will select one thread
* with the highest priority level, then switch to it.
*/
void rt_schedule(void)
{
rt_base_t level;
struct rt_thread *to_thread;
struct rt_thread *from_thread;
/* disable interrupt */
level = rt_hw_interrupt_disable();
/* check the scheduler is enabled or not */
if (rt_scheduler_lock_nest == 0)
{
register rt_ubase_t highest_ready_priority;
#if RT_THREAD_PRIORITY_MAX <= 32
highest_ready_priority = __rt_ffs(rt_thread_ready_priority_group) - 1;
#else
register rt_ubase_t number;
number = __rt_ffs(rt_thread_ready_priority_group) - 1;
highest_ready_priority = (number << 3) + __rt_ffs(rt_thread_ready_table[number]) - 1;
#endif
/* get switch to thread */
//获取当前最高优先级线程控制块
to_thread = rt_list_entry(rt_thread_priority_table[highest_ready_priority].next,
struct rt_thread,
tlist);
/* if the destination thread is not the same as current thread */
if (to_thread != rt_current_thread)
{//如果有比当前线程更高优先级的,则调度
rt_current_priority = (rt_uint8_t)highest_ready_priority;
from_thread = rt_current_thread;
rt_current_thread = to_thread;
RT_OBJECT_HOOK_CALL(rt_scheduler_hook, (from_thread, to_thread));
/* switch to new thread */
RT_DEBUG_LOG(RT_DEBUG_SCHEDULER,
("[%d]switch to priority#%d "
"thread:%.*s(sp:0x%p), "
"from thread:%.*s(sp: 0x%p)n",
rt_interrupt_nest, highest_ready_priority,
RT_NAME_MAX, to_thread->name, to_thread->sp,
RT_NAME_MAX, from_thread->name, from_thread->sp));
#ifdef RT_USING_OVERFLOW_CHECK
_rt_scheduler_stack_check(to_thread);
#endif
if (rt_interrupt_nest == 0)
{
rt_hw_context_switch((rt_uint32_t)&from_thread->sp,
(rt_uint32_t)&to_thread->sp);
}
else
{
RT_DEBUG_LOG(RT_DEBUG_SCHEDULER, ("switch in interruptn"));
rt_hw_context_switch_interrupt((rt_uint32_t)&from_thread->sp,
(rt_uint32_t)&to_thread->sp);
}
}
}
/* enable interrupt */
rt_hw_interrupt_enable(level);
}
优先级继承的神秘面纱就这么被揭开了,是不是很简单呢?看到这里我想大家可能还会有一个疑问, 互斥锁能够解决优先级反转,而互斥锁的搭档信号量能解决优先级反转问题吗?要解决这个疑问,我们就需要从根上分析下互斥锁和信号量的区别。
互斥量和信号量的区别
网上关于两者的概念有很多, 我们摘取一种解释:
1. 互斥量用于线程的互斥,信号量用于线程的同步。
这是互斥量和信号量的根本区别,也就是互斥和同步之间的区别。
互斥:是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。
同步:是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源
2. 互斥量值只能为0/1,信号量值可以为非负整数。
也就是说,一个互斥量只能用于一个资源的互斥访问,它不能实现多个资源的多线程互斥问题。信号量可以实现多个同类资源的多线程互斥和同步。当信号量为单值信号量是,也可以完成一个资源的互斥访问。
3. 互斥量的加锁和解锁必须由同一线程分别对应使用,信号量可以由一个线程释放,另一个线程得到。
看了之后感觉还是没有说到根上,只是说明了结果,没有道清楚原因, 无奈,我们看下源码:
互斥锁release源码
/**
* This function will release a mutex, if there are threads suspended on mutex,
* it will be waked up.
*
* @param mutex the mutex object
*
* @return the error code
*/
rt_err_t rt_mutex_release(rt_mutex_t mutex)
{
register rt_base_t temp;
struct rt_thread *thread;
rt_bool_t need_schedule;
/* parameter check */
RT_ASSERT(mutex != RT_NULL);
RT_ASSERT(rt_object_get_type(&mutex->parent.parent) == RT_Object_Class_Mutex);
need_schedule = RT_FALSE;
/* only thread could release mutex because we need test the ownership */
RT_DEBUG_IN_THREAD_CONTEXT;
/* get current thread */
thread = rt_thread_self();
/* disable interrupt */
temp = rt_hw_interrupt_disable();
RT_DEBUG_LOG(RT_DEBUG_IPC,
("mutex_release:current thread %s, mutex value: %d, hold: %dn",
thread->name, mutex->value, mutex->hold));
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(mutex->parent.parent)));
/* mutex only can be released by owner */
if (thread != mutex->owner)
{
thread->error = -RT_ERROR;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
return -RT_ERROR;
}
/* decrease hold */
mutex->hold --;
/* if no hold */
if (mutex->hold == 0)
{
/* change the owner thread to original priority */
if (mutex->original_priority != mutex->owner->current_priority)
{
rt_thread_control(mutex->owner,
RT_THREAD_CTRL_CHANGE_PRIORITY,
&(mutex->original_priority));
}
/* wakeup suspended thread */
if (!rt_list_isempty(&mutex->parent.suspend_thread))
{
/* get suspended thread */
thread = rt_list_entry(mutex->parent.suspend_thread.next,
struct rt_thread,
tlist);
RT_DEBUG_LOG(RT_DEBUG_IPC, ("mutex_release: resume thread: %sn",
thread->name));
/* set new owner and priority */
mutex->owner = thread;
mutex->original_priority = thread->current_priority;
mutex->hold ++;
/* resume thread */
rt_ipc_list_resume(&(mutex->parent.suspend_thread));
need_schedule = RT_TRUE;
}
else
{
/* increase value */
mutex->value ++;
/* clear owner */
mutex->owner = RT_NULL;
mutex->original_priority = 0xff;
}
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* perform a schedule */
if (need_schedule == RT_TRUE)
rt_schedule();
return RT_EOK;
}
RTM_EXPORT(rt_mutex_release);
信号量实现源码:
/**
* This function will take a semaphore, if the semaphore is unavailable, the
* thread shall wait for a specified time.
*
* @param sem the semaphore object
* @param time the waiting time
*
* @return the error code
*/
rt_err_t rt_sem_take(rt_sem_t sem, rt_int32_t time)
{
register rt_base_t temp;
struct rt_thread *thread;
/* parameter check */
RT_ASSERT(sem != RT_NULL);
RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore);
RT_OBJECT_HOOK_CALL(rt_object_trytake_hook, (&(sem->parent.parent)));
/* disable interrupt */
temp = rt_hw_interrupt_disable();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("thread %s take sem:%s, which value is: %dn",
rt_thread_self()->name,
((struct rt_object *)sem)->name,
sem->value));
if (sem->value > 0)
{
/* semaphore is available */
sem->value --;
/* enable interrupt */
rt_hw_interrupt_enable(temp);
}
else
{
/* no waiting, return with timeout */
if (time == 0)
{
rt_hw_interrupt_enable(temp);
return -RT_ETIMEOUT;
}
else
{
/* current context checking */
RT_DEBUG_IN_THREAD_CONTEXT;
/* semaphore is unavailable, push to suspend list */
/* get current thread */
thread = rt_thread_self();
/* reset thread error number */
thread->error = RT_EOK;
RT_DEBUG_LOG(RT_DEBUG_IPC, ("sem take: suspend thread - %sn",
thread->name));
/* suspend thread */
rt_ipc_list_suspend(&(sem->parent.suspend_thread),
thread,
sem->parent.parent.flag);
/* has waiting time, start thread timer */
if (time > 0)
{
RT_DEBUG_LOG(RT_DEBUG_IPC, ("set thread:%s to timer listn",
thread->name));
/* reset the timeout of thread timer and start it */
rt_timer_control(&(thread->thread_timer),
RT_TIMER_CTRL_SET_TIME,
&time);
rt_timer_start(&(thread->thread_timer));
}
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* do schedule */
rt_schedule();
if (thread->error != RT_EOK)
{
return thread->error;
}
}
}
RT_OBJECT_HOOK_CALL(rt_object_take_hook, (&(sem->parent.parent)));
return RT_EOK;
}
RTM_EXPORT(rt_sem_take);
/**
* This function will release a semaphore, if there are threads suspended on
* semaphore, it will be waked up.
*
* @param sem the semaphore object
*
* @return the error code
*/
rt_err_t rt_sem_release(rt_sem_t sem)
{
register rt_base_t temp;
register rt_bool_t need_schedule;
/* parameter check */
RT_ASSERT(sem != RT_NULL);
RT_ASSERT(rt_object_get_type(&sem->parent.parent) == RT_Object_Class_Semaphore);
RT_OBJECT_HOOK_CALL(rt_object_put_hook, (&(sem->parent.parent)));
need_schedule = RT_FALSE;
/* disable interrupt */
temp = rt_hw_interrupt_disable();
RT_DEBUG_LOG(RT_DEBUG_IPC, ("thread %s releases sem:%s, which value is: %dn",
rt_thread_self()->name,
((struct rt_object *)sem)->name,
sem->value));
if (!rt_list_isempty(&sem->parent.suspend_thread))
{
/* resume the suspended thread */
rt_ipc_list_resume(&(sem->parent.suspend_thread));
need_schedule = RT_TRUE;
}
else
sem->value ++; /* increase value */
/* enable interrupt */
rt_hw_interrupt_enable(temp);
/* resume a thread, re-schedule */
if (need_schedule == RT_TRUE)
rt_schedule();
return RT_EOK;
}
RTM_EXPORT(rt_sem_release);
从源码可以看出,互斥锁在加锁和解锁过程中需要获取当前线程, 也就意味着互斥锁只能在线程上下文中使用, 而且互斥锁在加锁和解锁期间只能被一个线程拥有,而信号量可以由一个线程释放,另一个线程得到,也正是因为这个特点,当一个低优先级线程占据了高优先级线程需要资源后, 可通过优先级继承的方式来提升低优先级线程的优先级, 从而解决优先级反转问题(还有一种解决优先级反转问题的方式是优先级天花板)互斥锁具备解决优先级反转的能力,
对于信号量, 你都不知道它将被哪个线程释放,又怎么能够通过优先级继承或优先级天花板的方式来解决优先级反转问题呢, 因此也正是这一点, 从根本上决定了信号量无法解决优先级反转问题, 但是也正是这一点, 决定了信号量可以在中断中进行释放,用于中断与线程的同步(这一点是互斥锁无法做到的), 那么为什么中断中可以使用信号量,而不可以使用互斥锁呢?中断中仅可以释放信号量(不能获取信号量), 中断中不能使用互斥锁, 是因为互斥锁是作用域线程, 只能在线程上下文中使用,而信号量可用于中断与线程间的同步, 从源码中是可以看出区别。另外,中断中不能休眠(中断会不会被线程切换中断呢,答案是不会的, 因为操作系统线程切换采用的是最低优先级的异常PendSV)
在RT-Thread官网网上说明上,也有一段话,说明了信号量不能解决优先级反转问题:
Note
注:在计算机操作系统发展历史上,人们早期使用二值信号量来保护临界区,但是在1990年,研究人员发现了使用信号量保护临界区会导致无界优先级反转的问题,因此提出了互斥量的概念。如今,我们已经不使用二值信号量来保护临界区,互斥量取而代之。
关于信号量同步,一个典型例子:
计算c = a+b
线程A输出结果a,线程B输出结果b,线程C则需要利用线程AB的结果计算c = a + b(一共三个线程)显然,第三个线程必须等第一、二个线程执行完毕它才能执行。在这个时候,我们就需要调度线程了:让第一、二个线程执行完毕后,再执行第三个线程,这种情况就非常适合实用sem.
为什么中断中不能休眠?
我们知道,线程有自己的线程上下文,也有初始,就绪,挂起(休眠),运行,结束状态,而这些状态依赖于线程调度器的调度,对应的数据结构就是线程控制块, 而中断也有自己的中断上下文, 却没有中断控制描述符来描述它, 不属于操作系统调度的范畴, 也没有休眠的概念,更没有中断调度器的概念(中断重复嵌套不属于这种概念) , 一旦在中断上下文中休眠, 首先无法切换上下文(因为没有中断描述符, 上下文状态无法保存),其次也无法再次被唤醒, 因为找不到回去的路了。
此外, 中断的发送是非常频繁的, 在一个中断期间睡眠,其他中断发生并产生睡眠, 则对中断栈也是一个严峻的考验, 容易造成中断栈溢出使系统崩溃. 假设上述条件都满足了,有了中断描述符,线程调度器也能调度中断了, 理论上是可以做到中断中休眠的, 但是会产生很多问题, 比如,时钟中断中休眠, 那么操作系统的时钟就乱了, 调度器也将乱掉, 系统性能和稳定性将大大降低。所以, 回归到中断的本质,作为一种紧急突发任务, 需要操作系统立即处理, 不是不能睡眠,是没有理由睡眠.
最后
以上就是淡然小白菜为你收集整理的RT-Thread内核源码分析-优先级反转代码实现优先级反转概念如何解决优先级反转优先级继承代码实现分析互斥量和信号量的区别为什么中断中不能休眠?的全部内容,希望文章能够帮你解决RT-Thread内核源码分析-优先级反转代码实现优先级反转概念如何解决优先级反转优先级继承代码实现分析互斥量和信号量的区别为什么中断中不能休眠?所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复