如果某种应用并不需要在多个CPU上并行执行,那么软中断其实是没有必要的。因此诞生了弥补以上两个要求的tasklet。它具有以下特性:
a)一种特定类型的tasklet只能运行在一个CPU上,不能并行,只能串行执行。
b)多个不同类型的tasklet可以并行在多个CPU上。
c)软中断是静态分配的,在内核编译好之后,就不能改变。但tasklet就灵活许多,可以在运行时改变(比如添加模块时)。
tasklet是在两种软中断类型的基础上实现的,因此如果不需要软中断的并行特性,tasklet就是最好的选择。也就是说tasklet是软中断的一种特殊用法,即延迟情况下的串行执行。
Tasklet机制是一种较为特殊的软中断。Tasklet一词的原意是“小片任务”的意思,这里是指一小段可执行的代码,且通常以函数的形式出现
因为tasklet也是一种软中断,考虑到优先级问题,分别占用了向量表(softirq_vec
)中的HI_SOFTIRQ
和TASKLET_SOFTIRQ
两类软中断。
tasklet机制是Linux内核对软中断机制的一种扩展和封装。正是由于这种历史的延伸关系,使得tasklet机制与一般意义上的软中断有所不同,而呈现出以下两个显著的特点:
tasklet_struct
对象的func函数)在某个时刻只能在一个CPU上运行,而不像一般的软中断服务函数(即softirq_action
结构中的action
函数指针)那样——在同一时刻可以被多个CPU并发地执行。驱动开发时直接使用HI_SOFTIRQ
和TASKLET_SOFTIRQ
这两类软中断太局限了,为了扩展性,内核允许我们定义自己的tasklet,tasklet由tasklet_struct
结构体类型表示,每个结构体代表一个tasklet,在中定义:
struct tasklet_struct
{
struct tasklet_struct *next;
unsigned long state;
atomic_t count;
void (*func)(unsigned long);
unsigned long data;
};
其中的成员:
next
:指向下一个tasklet的指针,说明这个结构体的成员会被加入到一个链表里。
state
:用于标识tasklet状态,这一个无符号长整数,当前只使用了bit[1]和bit[0]两个状态位。其中,bit[1]=1表示这个tasklet当前正在某个CPU上被执行,它仅对SMP系统才有意义,其作用就是为了防止多个CPU同时执行一个tasklet的情形出现;bit[0]=1表示这个tasklet已经被调度去等待执行了但还没有开始执行,其作用是阻止同一个tasklet在被运行之前被重复调度,考虑如下情况:一个tasklet已经被触发过一次,即调度过一次,但可能还没有来得及被执行。对这两个状态位的宏定义如下所示:
enum
{
TASKLET_STATE_SCHED,
TASKLET_STATE_RUN
};
可以理解为每个tasklet有一个简单的状态机,0 -> TASKLET_STATE_SCHED -> TASKLET_STATE_RUN -> 0
。
count
:引用计数,若不为0,则tasklet被禁止,只有当它为0时,tasklet才被激活,也就是说该tasklet的处理函数func
才可以被执行,只有设置为激活后,tasklet对应的软中断被raise时该tasklet才会被投入运行。
func
:是一个函数指针,也是对应这个tasklet的处理函数。
data
:函数func的参数。这是一个32位的无符号整数,其具体含义可供func函数自行解释,比如将其解释成一个指向某个用户自定义数据结构的地址值
分两种创建tasklet对象的方式:
静态创建:使用中定义的两个宏中的一个:
#define DECLARE_TASKLET(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }
#define DECLARE_TASKLET_DISABLED(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }
两个宏之间的区别在于引用计数的初始值设置不同,DECLARE_TASKLET
把创建的tasklet的引用计数设置为0,一开始处于激活状态;DECLARE_TASKLET_DISABLED
把创建的tasklet的引用计数设置为1,一开始处于禁止(非激活)状态。
动态创建:使用tasklet_init
函数:
extern void tasklet_init(struct tasklet_struct *t,
void (*func)(unsigned long), unsigned long data);
我们可以看到,无论是静态方式,还是动态方式,都需要传一个函数地址,这个函数就是每个tasklet自己需要实现的处理函数func
影响的是tasklet_struct
结构体的state域。
state成员中的bit0表示一个tasklet是否已被调度但还未被执行,正在等待被执行(run),内核实现tasklet的机制会保证处在这种状态下的同一个tasklet只会有一个被触发,也就是说如果一个tasklet已经被触发过一次,即调度过一次,但可能还没有来得及被执行(run),而期间同一个tasklet又被触发一次,则系统会简单地拒绝掉,无论当前系统是单处理器还是多处理器,参考内核提供的触发tasklet的函数tasklet_schedule
代码如下:
static inline void tasklet_schedule(struct tasklet_struct *t)
{
if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
__tasklet_schedule(t);
}
该函数中调用test_and_set_bit
的用意是Set a bit and return its old value
,并且保证在一个原子操作中完成,如果原来bit[0]
的值就是1,那么取非后为0则里面__tasklet_schedule(t);
就不会被执行。
该bit[0]的值在tasklet的状态迁移到TASKLET_STATE_RUN
后被恢复为0,发生在tasklet的执行函数tasklet_action
中。注意恢复为0的动作test_and_clear_bit(TASKLET_STATE_SCHED...
发生在实际运行t->func(t->data);
之前,这说明在SMP系统上,同一个tasklet一旦被run起来之后,其状态位TASKLET_STATE_SCHED
必然已经被归零,即使没有run结束,在另一个处理器上还是有机会利用上述的tasklet_schedule
将同一个tasklet调度(schedule)起来的。参考如下代码:
static void tasklet_action(struct softirq_action *a)
{
...
while (list) {
...
if (tasklet_trylock(t)) {
if (!atomic_read(&t->count)) {
if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
BUG();
t->func(t->data);
tasklet_unlock(t);
continue;
}
tasklet_unlock(t);
}
...
}
现在就比较好理解LKD3的8.3.2 使用tasklet
章节的3. 调度你自己的tasklet
中的那句话了。原文如下:
在tasklet被调度后,只要有机会它就会尽可能早地运行。在它还没有得到
运行机会之前,如果有一个相同的tasklet又被调度了,那么它仍然只会运
行一次。而如果这时它已经开始运行了,比如说在另外一个处理器上,那
么这个新的tasklet会被重新调度并再次运行。
更进一步来说,如果对同一个tasklet,如果一个处理器CPU0上正在运行它,另一个处理器CPU1还是有机会调度它,注意仍然保证不会同时运行,但可以在此后的时间被CPU1运行。但与此同时如果在第三个CPU2上又有一个新的想通过的tasklet发生,那只有被丢弃的份了。
要深刻理解以上内容,可以结合下文描述的”软中断向量TASKLET_SOFTIRQ的服务程序tasklet_action()”的详细解释。
state成员中的bit1用于判断一个
tasklet是否正在某个CPU上执行。对于state变量中某位的改变必须是一个原子操作,因此可以用定义在<linux/bitops.h>
头文件中的位操作来进行。由于这一位(即TASKLET_STATE_RUN
)仅仅对于SMP系统才有意义,因此Linux在interrupt.h
头文件中显示地定义了对TASKLET_STATE_RUN
位的操作。如下所示:
#ifdef CONFIG_SMP
static inline int tasklet_trylock(struct tasklet_struct *t)
{
return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
}
static inline void tasklet_unlock(struct tasklet_struct *t)
{
smp_mb__before_clear_bit();
clear_bit(TASKLET_STATE_RUN, &(t)->state);
}
static inline void tasklet_unlock_wait(struct tasklet_struct *t)
{
while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); }
}
#else
#define tasklet_trylock(t) 1
#define tasklet_unlock_wait(t) do { } while (0)
#define tasklet_unlock(t) do { } while (0)
#endif
对于第1位的处理还是发生在tasklet的执行函数tasklet_action
中。
static void tasklet_action(struct softirq_action *a)
{
...
while (list) {
...
if (tasklet_trylock(t)) {
if (!atomic_read(&t->count)) {
if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
BUG();
t->func(t->data);
tasklet_unlock(t);
continue;
}
tasklet_unlock(t);
}
...
}
显然,在SMP系统中,tasklet_trylock()宏将把一个tasklet_struct结构变量中的state成员中的bit[1]
位设置成1,同时还返回bit[1]
位的非。因此,如果bit[1]
位原有值为1(表示另外一个CPU正在执行这个tasklet代码),那么tasklet_trylock()
宏将返回值0,也就表示上锁不成功。如果bit[1]
位的原有值为0,那么tasklet_trylock()
宏将返回值1,表示加锁成功。而在单CPU系统中,tasklet_trylock()
宏总是返回为1。
任何想要执行某个tasklet代码的程序都必须首先调用宏tasklet_trylock()
来试图对这个tasklet进行上锁(即设置TASKLET_STATE_RUN
位),且只能在上锁成功的情况下才能执行这个tasklet。
宏tasklet_unlock()
用来对一个tasklet进行解锁操作,也即将TASKLET_STATE_RUN
位清零。在单CPU系统中,tasklet_unlock
被优化为一个空操作。
针对另外一个在tasklet_action
没有用到的接口tasklet_unlock_wait
,在SMP系统中,tasklet_unlock_wait()
宏将一直不停地测试TASKLET_STATE_RUN
位的值,直到该位的值变为0(即一直等待到解锁),假如:CPU0正在执行tasklet A的代码,在此期间,CPU1也想执行tasklet A的代码,但CPU1发现tasklet A的TASKLET_STATE_RUN
位为1,于是它就可以通过tasklet_unlock_wait()
宏等待tasklet A被解锁(也即TASKLET_STATE_RUN
位被清零)。在单CPU系统中,tasklet_unlock_wait
被优化为一个空操作。
使能与禁止操作往往总是成对地被调用的,tasklet_disable()函数如下:
static inline void tasklet_disable(struct tasklet_struct *t)
{
tasklet_disable_nosync(t);
tasklet_unlock_wait(t);
smp_mb();
}
static inline void tasklet_enable(struct tasklet_struct *t)
{
smp_mb__before_atomic_dec();
atomic_dec(&t->count);
}
函数tasklet_disable_nosync()也是一个静态inline函数,它简单地通过原子操作将count成员变量的值减1。如下所示(interrupt.h):
static inline void tasklet_disable_nosync(struct tasklet_struct *t)
{
atomic_inc(&t->count);
smp_mb__after_atomic_inc();
}
前面我们看到tasklet_struct
里有个next
成员说明这个结构体的成员会被加入到一个链表里,这个链表的链表头定义在kernel/softirq.c
中:
/*
* Tasklets
*/
struct tasklet_head
{
struct tasklet_struct *head;
struct tasklet_struct **tail;
};
尽管tasklet机制是特定于软中断向量HI_SOFTIRQ
和TASKLET_SOFTIRQ
的一种实现,但是tasklet机制仍然属于softirq机制的整体框架范围内的,因此,它的设计与实现仍然必须坚持“谁触发,谁执行”的思想。为此,Linux为系统中的每一个CPU都定义了一个tasklet队列头部,来表示应该由各个CPU负责执行的tasklet队列。
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);
其中tasklet_vec
用来处理TASKLET_SOFTIRQ
类的tasklets。tasklet_hi_vec
用来处理HI_SOFTIRQ
类的tasklets。
队列tasklet_vec
和tasklet_hi_vec
中的各个tasklet是怎样被所CPU所执行的呢?其关键就是软中断向量TASKLET_SOFTIRQ
和HI_SOFTIRQ
的软中断服务程序——tasklet_action()
函数和tasklet_hi_action()
函数。下面我们就来分析这两个函数。
Linux为软中断向量TASKLET_SOFTIRQ
和HI_SOFTIRQ
实现了专用的触发函数和软中断服务函数。其中,tasklet_schedule()
函数和tasklet_hi_schedule()
函数分别用来在当前CPU上触发软中断向量TASKLET_SOFTIRQ
和HI_SOFTIRQ
,并把指定的tasklet加入当前CPU所对应的tasklet_vec
和tasklet_hi_vec
队列中去等待执行。而tasklet_action()
函数和tasklet_hi_action()
函数则分别是软中断向量TASKLET_SOFTIRQ
和HI_SOFTIRQ
的软中断服务函数。在初始化函数softirq_init()
中,这两个软中断向量对应action函数指针就被分别初始化成指向函数tasklet_action()
和函数tasklet_hi_action()
。参考kernel/softirq.c
的softirq_init
函数,里面有如下两个函数调用:
open_softirq(TASKLET_SOFTIRQ, tasklet_action);
open_softirq(HI_SOFTIRQ, tasklet_hi_action);
softirq_init
将会在内核启动阶段被调用,完成对这两类tasklet相关的软中断的注册。
后面我们主要针对TASKLET_SOFTIRQ
学习,HI_SOFTIRQ
原理基本上是一样的。
该函数实现在include/linux/interrupt.h
头文件中,是一个inline函数。其源码如下所示:
static inline void tasklet_schedule(struct tasklet_struct *t)
{
if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
__tasklet_schedule(t);
}
在我们所期望的上半部分,比如一个中断处理函数中,驱动可以通过调用tasklet_schedule
来触发TASKLET_SOFTIRQ
软中断,检查tasklet_schedule
函数可以知道,该函数首先检查该tasklet是否已经被触发过(TASKLET_STATE_SCHED
是否已经被设置为1),如果已经为1则立即退出,说明这个tasklet已经被调度,并在一个CPU上等待被执行但还没有执行。由于同一个tasklet在某一个时刻只能在一个CPU上等待被执行,因此tasklet_schedule()
函数什么也不做就直接返回了。避免了重复触发,否则就设置TASKLET_STATE_SCHED
对应位为1,注意以上检查和设置的操作以原子的方式进行。此后如果继续触发则调用函数__tasklet_schedule
,该函数定义在kernel/softirq.c
中,完成实际的触发动作,如下所示:
void __tasklet_schedule(struct tasklet_struct *t)
{
unsigned long flags;
local_irq_save(flags);①
t->next = NULL;②
*__get_cpu_var(tasklet_vec).tail = t;
__get_cpu_var(tasklet_vec).tail = &(t->next);
raise_softirq_irqoff(TASKLET_SOFTIRQ);③
local_irq_restore(flags);④
}
该函数中:
local_irq_save()
函数来关闭当前CPU的中断,以保证下面的步骤在当前CPU上原子地被执行。raise_softirq_irqoff()
函数在当前CPU上触发软中断请求TASKLET_SOFTIRQ
,具体地就是将软中断状态寄存器irq_stat
中对应TASKLET_SOFTIRQ
的状态位raise/设置
为pending状态。这样内核将在稍后的一个恰当的时间点对该触发的TASKLET_SOFTIRQ
软中断进行处理。local_irq_restore()
函数来开当前CPU的中断。函数tasklet_action()
是tasklet机制与软中断向量TASKLET_SOFTIRQ
的联系纽带。正是该函数将当前CPU的tasklet队列中的各个tasklet放到当前CPU上来执行的。该函数实现在kernel/softirq.c
文件中,其源代码如下:
static void tasklet_action(struct softirq_action *a)
{
struct tasklet_struct *list;
local_irq_disable();①
list = __get_cpu_var(tasklet_vec).head;
__get_cpu_var(tasklet_vec).head = NULL;
__get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head;
local_irq_enable();
while (list) {②
struct tasklet_struct *t = list;③
list = list->next;④
if (tasklet_trylock(t)) {⑤
if (!atomic_read(&t->count)) {⑥
if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))⑥-①
BUG();
t->func(t->data);⑥-②
tasklet_unlock(t);⑥-③
continue;⑥-④
}
tasklet_unlock(t);⑦
}
local_irq_disable();⑧-①
t->next = NULL;⑧-②
*__get_cpu_var(tasklet_vec).tail = t;
__get_cpu_var(tasklet_vec).tail = &(t->next);
__raise_softirq_irqoff(TASKLET_SOFTIRQ);⑧-③
local_irq_enable();⑧-④
}⑨
}
注释如下:
tasklet_vec
的头部指针,将其保存到局部变量list指针中,然后将当前CPU的tasklet队列头部指针设置为NULL,tasklet_vec
为空(但最后的实际结果却并不一定如此,下面将会看到)。tasklet_trylock()
试图对当前要执行的tasklet(由指针t所指向)进行加锁,如果加锁成功(当前没有任何其他CPU正在执行这个tasklet),则用原子读函数atomic_read()
进一步判断count成员的值。TASKLET_STATE_SCHED
位;tasklet_unlock()
来清除TASKLET_STATE_RUN
位。tasklet_vec
链表中移除。tasklet_unlock()
清除前面用tasklet_trylock()
设置的TASKLET_STATE_RUN
位。tasklet_trylock()
加锁不成功,或者因为当前tasklet的count值非0而不允许执行时,我们必须将这个tasklet重新放回到当前CPU的tasklet队列中,以留待这个CPU下次服务软中断向量TASKLET_SOFTIRQ
时再执行。为此进行这样几步操作:__raise_softirq_irqoff()
函数在当前CPU上再触发一次软中断请求TASKLET_SOFTIRQ
;整个过程我们发现tasklet的实现完全基于软中断机制,但施加了一些特定的约束,导致采用tasklet实现时对于驱动开发人员来说更容易。tasklet相关于一般软中断,相同点和不同点如下:
func
中依然不允许休眠,而且是在开中断的上下文环境下运行。软中断:
tasklet:
HI_SOFTIRQ
和TASKLET_SOFTIRQ
。本质上没有什么区别,只不过HI_SOFTIRQ
的优先级更高一些,建立在HI_SOFTIRQ
上的tasklet会早于TASKLET_SOFTIRQ
执行。驱动模块利用tasklet来实现下半部分的时候需要遵循下面的步骤:
这个回调函数的形式必须是void tasklet_handler(unsigned long data);
参数data
是传递给回调函数的参数,其值在第一步创建tasklet时给定。
编写tasklet处理函数时要注意以下要求:
采用静态或者动态方式,在创建的同时将定义的tasklet处理函数的地址注册到内核中。
驱动模块在中断处理的上半部分调用tasklet_schedule()
函数触发软中断。内核会在稍后的下半部分处理过程中调用我们曾经注册的处理函数执行BH动作。
copy from http://unicornx.github.io/2016/02/12/20160212-lk-drv-tasklet/
手机扫一扫
移动阅读更方便
你可能感兴趣的文章