赞
踩
Linux中的workqueue机制就是为了简化内核线程的创建。通过调用workqueue的接口就能创建内核线程。并且可以根据当前系统的CPU的个数创建线程的数量,使得线程处理的事务能够并行化。
工作队列(workqueue)是另外一种将工作推后执行的形式。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文执行。最重要的就是工作队列允许被重新调度甚至睡眠。
在内核代码中,经常会遇到不能或不合适马上调用某个处理过程,此时希望将该工作推给某个内核线程执行,这样做的原因有很多,比如:
基于以上需求,人们开发除了工作队列这一机制。工作队列不光在操作系统内核中会用到,一些应用程序或协议栈也会实现自己的工作队列。
工作队列(workqueue):是将操作(或回调)延期异步执行的一种机制。工作队列可以把工作推后,交由一个内核线程去执行,并且工作队列是执行在线程上下文中,因此工作队列执行过程中可以被重新调度、抢占、睡眠。
工作项(work item):是工作队列中的元素,是一个回调函数和多个回调函数参数的集合,有时也会有额外的属性成员,总之通过一个结构体即可记录和描述一个工作项。
- struct work_struct {
- atomic_long_t data;
- struct list_head entry;
- work_func_t func;
- #ifdef CONFIG_LOCKDEP
- struct lockdep_map lockdep_map;
- #endif
- ANDROID_KABI_RESERVE(1);
- ANDROID_KABI_RESERVE(2);
- };
func的参数是一个work_struct指针,指向的数据就是定义func的work_struct。
看到这里,会有两个疑问:
第一:如何把用户的数据作为参数传递给func呢?
第二:如何实现延迟工作?
解决第一个问题:工作队列需要把work_struct定义在用户的数据结构中,然后通过container_of来得到用户数据。
对于第二个问题,新的工作队列把timer拿掉的用意是使得work_struct更加单纯。首先回忆一下以前的版本,只有在需要延迟执行工作时才会用到timer,普通情况下timer是没有意义的,所以之前的做法在一定程序上有些浪费资源。所以新版本中,将timer从work_struct中拿掉,然后又定义了一个新的结构delayed_work用于延迟执行。
- struct delayed_work {
-
- struct work_struct work;
-
- struct timer_list timer;
-
- };
不是所有的驱动程序都必须有自己的工作队列。驱动程序可以使用内核提供的缺省工作队列。由于这个工作队列由很多驱动程序共享,任务可能会需要比较长一段时间才能开始执行。为了解决这一问题,工作函数中的延迟应该保持最小或者不要延时。
每个工作队列由一个专门的线程(即一个工作队列一个线程),所有来自运行队列的任务在进程的上下文中运行(这样它们可以休眠)。驱动程序可以创建并使用它们自己的工作队列,或者使用内核的一个工作队列。
- //创建工作队列
- struct workqueue_struct *create_workqueue(const char *name);
工作队列任务可以在编译时或者运行时创建。
- //编译时创建
- DECLARE_WORK(name, void (*function)(void *), void *data);
- //运行时创建
- INIT_WORK(struct work_struct *work, void (*function)(void *), void *data);
- //添加到指定工作队列
- int queue_work(struct workqueue_struct *queue, struct work_struct *work);
- <br>
- int queue_delayed_work(struct workqueue_struct *queue, struct work_struct
- <br>
- *work, unsigned long delay);
-
-
- //添加到内核默认工作队列
- int schedule_work(struct work_struct *work);
- int schedule_delayed_work(struct work_struct *work, unsigned long delay);
delay:保证至少在经过一段给定的最小延迟时间以后,工作队列中的任务才可以真正执行。
- //取消任务
- int cancel_delayed_work(struct work_struct *work);
-
-
- //清空队列中的所有任务
- void flush_workqueue(struct workqueue_struct *queue);
-
-
- //销毁工作队列
- void destroy_workqueue(struct workqueue_struct *queue);
- struct my_struct_t {
-
- char *name;
-
- struct work_struct my_work;
-
- };
-
- void my_func(struct work_struct *work)
-
- {
-
- struct my_struct_t *my_name = container_of(work, struct my_struct_t, my_work);
-
- printk(KERN_INFO “Hello world, my name is %s!\n”, my_name->name);
-
- }
-
- struct workqueue_struct *my_wq = create_workqueue(“my wq”);
-
- struct my_struct_t my_name;
-
- my_name.name = “Jack”;
-
- INIT_WORK(&(my_name.my_work), my_func);
-
- queue_work(my_wq, &my_work);
workqueue是内核里面很重要的一个机制,特别是内核驱动,一般的小型任务(work)都不会自己起一个线程来处理,而是扔到workqueue中处理。workqueue的主要工作就是用进程上下文来处理内核中大量的小任务。
所以workqueue的主要涉及思想:一个是并行,多个work不要相互阻塞。另一个是节省资源,多个work尽量共享资源(进程、调度、内存),不要造成系统过多的资源浪费。
为了实现设计思想,workqueue的设计
实现也更新了很多版本。最新的workqueue实现叫做CMWQ(concurrency Managed Workqueue),也就是用更加只能的算法来实现“并行和节省”。新版本的workqueue创建函数改成alloc_workqueue(),旧版本的函数create_workqueue()逐渐会被废弃。
关于workqueue中几个概念都是work相关的数据结构,非常容易混淆,大概可以这样理解。
1)work:工作
2)workqueue:工作集合。workqueue和work是一对多的关系
3)worker: 工人。在代码中worker对应一个work_thread()内核线程
4)worker_pool: 工人的集合。worker_pool和worker是一对多的关系
5)PWQ(pool_workqueue):中间人/中介,负责建立workqueue和worker_pool之间的关系,workqueue和pwq是一对多的关系,pwq和worker_pool是一对一的关系。
每个执行work的线程叫做worker,一组worker的结合叫做worker_pool。CMWQ的精髓就在worker_pool里面的worker的动态增减的管理上 manage_workers()。
CMWQ对worker_pool分成两类:
normal worker_pool,给通用的workqueue使用;
unbound worker_pool,给WQ_UNBOUND类型的workqueue使用;
默认work是在normal worker_pool中处理的。系统的规划是每个CPU创建两个normal worker_pool:一个Nomal的优先级(nice=0),一个高优先级(nice=HIGHPRI_NICE_LEVEL),对应创建出来的worker进程的nice不一样。
每个worker对应一个worker_thread()内核线程,一个worker_pool包含一个或者多个worker,worker_pool中worker的数量是根据worker_pool中work的负载来动态增减的。
我们可以通过ps aux | grep kworker命令来查看所有worker对应的内核线程,normal worker_pool对应内核线程(worker_thread())的命名规则是这样的:
- snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
- pool->attrs->nice < 0 ? "H" : "");
-
- worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
- "kworker/%s", id_buf);
so 类似名字是 normal worker_pool:
- shell@PRO5:/ $ ps | grep "kworker"
- root 14 2 0 0 worker_thr 0000000000 S kworker/1:0H // cpu1 高优先级 worker_pool 的第 0 个 worker 进程
- root 17 2 0 0 worker_thr 0000000000 S kworker/2:0 // cpu2 低优先级 worker_pool 的第 0 个 worker 进程
- root 18 2 0 0 worker_thr 0000000000 S kworker/2:0H // cpu2 高优先级 worker_pool 的第 0 个 worker 进程
- root 23699 2 0 0 worker_thr 0000000000 S kworker/0:1 // cpu0 低优先级 worker_pool 的第 1 个 worker 进程
大部分的work都是通过normal worker_pool来执行的(例如通过schedule_work()、schedule_work_on()压入到系统workqueue中的work),最后都是通过normal worker_pool中的worker来执行的。这些worker是和某个CPU绑定的,work一旦被worker开始执行,都是一直运行到某个CPU上的,不会切换CPU。
unbound worker_pool相对应的意思,就是worker可以在多个CPU上调度。但是它其实也是绑定的,只不过它绑定的单位不是CPU,而是node,所谓的node是对NUMA(Non uniform Memory Access Architecture)系统来说的,NUMA可能存在多个Node,每个node可能包含一个或者多个CPU。
unbound worker_pool对应内核线程(worker_thread())的命名规则是这样的:
- snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
-
- worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
- "kworker/%s", id_buf);
so 类似名字是 unbound worker_pool:
- shell@PRO5:/ $ ps | grep "kworker"
- root 23906 2 0 0 worker_thr 0000000000 S kworker/u20:2/* unbound pool 20 的第 2 个 worker 进程*/
- root 24564 2 0 0 worker_thr 0000000000 S kworker/u20:0/* unbound pool 20 的第 0 个 worker 进程*/
- root 24622 2 0 0 worker_thr 0000000000 S kworker/u21:1/* unbound pool 21 的第 1 个 worker 进程*/
每个worker对应一个worker_thread()内核线程,一个worker_pool对应一个或者多个worker。多个worker从同一个链表中worker_pool->worklist获取work进行处理。
这其中有几个重点:
处理 work 的过程主要在 worker_thread() -> process_one_work() 中处理,我们具体看看代码的实现过程。
kernel/workqueue.c: worker_thread() -> process_one_work()
- static int worker_thread(void *__worker)
- {
- struct worker *worker = __worker;
- struct worker_pool *pool = worker->pool;
-
- /* tell the scheduler that this is a workqueue worker */
- worker->task->flags |= PF_WQ_WORKER;
- woke_up:
- spin_lock_irq(&pool->lock);
-
- // (1) 是否 die
- /* am I supposed to die? */
- if (unlikely(worker->flags & WORKER_DIE)) {
- spin_unlock_irq(&pool->lock);
- WARN_ON_ONCE(!list_empty(&worker->entry));
- worker->task->flags &= ~PF_WQ_WORKER;
-
- set_task_comm(worker->task, "kworker/dying");
- ida_simple_remove(&pool->worker_ida, worker->id);
- worker_detach_from_pool(worker, pool);
- kfree(worker);
- return 0;
- }
-
- // (2) 脱离 idle 状态
- // 被唤醒之前 worker 都是 idle 状态
- worker_leave_idle(worker);
- recheck:
-
- // (3) 如果需要本 worker 继续执行则继续,否则进入 idle 状态
- // need more worker 的条件: (pool->worklist != 0) && (pool->nr_running == 0)
- // worklist 上有 work 需要执行,并且现在没有处于 running 的 work
- /* no more worker necessary? */
- if (!need_more_worker(pool))
- goto sleep;
-
- // (4) 如果 (pool->nr_idle == 0),则启动创建更多的 worker
- // 说明 idle 队列中已经没有备用 worker 了,先创建 一些 worker 备用
- /* do we need to manage? */
- if (unlikely(!may_start_working(pool)) && manage_workers(worker))
- goto recheck;
-
- /*
- * ->scheduled list can only be filled while a worker is
- * preparing to process a work or actually processing it.
- * Make sure nobody diddled with it while I was sleeping.
- */
- WARN_ON_ONCE(!list_empty(&worker->scheduled));
-
- /*
- * Finish PREP stage. We're guaranteed to have at least one idle
- * worker or that someone else has already assumed the manager
- * role. This is where @worker starts participating in concurrency
- * management if applicable and concurrency management is restored
- * after being rebound. See rebind_workers() for details.
- */
- worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
- do {
- // (5) 如果 pool->worklist 不为空,从其中取出一个 work 进行处理
- struct work_struct *work =
- list_first_entry(&pool->worklist,
- struct work_struct, entry);
- if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
- /* optimization path, not strictly necessary */
- // (6) 执行正常的 work
- process_one_work(worker, work);
- if (unlikely(!list_empty(&worker->scheduled)))
- process_scheduled_works(worker);
- } else {
- // (7) 执行系统特意 scheduled 给某个 worker 的 work
- // 普通的 work 是放在池子的公共 list 中的 pool->worklist
- // 只有一些特殊的 work 被特意派送给某个 worker 的 worker->scheduled
- // 包括:1、执行 flush_work 时插入的 barrier work;
- // 2、collision 时从其他 worker 推送到本 worker 的 work
- move_linked_works(work, &worker->scheduled, NULL);
- process_scheduled_works(worker);
- }
- // (8) worker keep_working 的条件:
- // pool->worklist 不为空 && (pool->nr_running <= 1)
- } while (keep_working(pool));
- worker_set_flags(worker, WORKER_PREP);supposed
- sleep:
- // (9) worker 进入 idle 状态
- /*
- * pool->lock is held and there's no work to process and no need to
- * manage, sleep. Workers are woken up only while holding
- * pool->lock or from local cpu, so setting the current state
- * before releasing pool->lock is enough to prevent losing any
- * event.
- */
- worker_enter_idle(worker);
- __set_current_state(TASK_INTERRUPTIBLE);
- spin_unlock_irq(&pool->lock);
- schedule();
- goto woke_up;
- }
- | →
- static void process_one_work(struct worker *worker, struct work_struct *work)
- __releases(&pool->lock)
- __acquires(&pool->lock)
- {
- struct pool_workqueue *pwq = get_work_pwq(work);
- struct worker_pool *pool = worker->pool;
- bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
- int work_color;
- struct worker *collision;
- #ifdef CONFIG_LOCKDEP
- /*
- * It is permissible to free the struct work_struct from
- * inside the function that is called from it, this we need to
- * take into account for lockdep too. To avoid bogus "held
- * lock freed" warnings as well as problems when looking into
- * work->lockdep_map, make a copy and use that here.
- */
- struct lockdep_map lockdep_map;
-
- lockdep_copy_map(&lockdep_map, &work->lockdep_map);
- #endif
- /* ensure we're on the correct CPU */
- WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
- raw_smp_processor_id() != pool->cpu);
- // (8.1) 如果 work 已经在 worker_pool 的其他 worker 上执行,
- // 将 work 放入对应 worker 的 scheduled 队列中延后执行
- /*
- * A single work shouldn't be executed concurrently by
- * multiple workers on a single cpu. Check whether anyone is
- * already processing the work. If so, defer the work to the
- * currently executing one.
- */
- collision = find_worker_executing_work(pool, work);
- if (unlikely(collision)) {
- move_linked_works(work, &collision->scheduled, NULL);
- return;
- }
-
- // (8.2) 将 worker 加入 busy 队列 pool->busy_hash
- /* claim and dequeue */
- debug_work_deactivate(work);
- hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
- worker->current_work = work;
- worker->current_func = work->func;
- worker->current_pwq = pwq;
- work_color = get_work_color(work);
-
- list_del_init(&work->entry);
-
- // (8.3) 如果 work 所在的 wq 是 cpu 密集型的 WQ_CPU_INTENSIVE
- // 则当前 work 的执行脱离 worker_pool 的动态调度,成为一个独立的线程
- /*
- * CPU intensive works don't participate in concurrency management.
- * They're the scheduler's responsibility. This takes @worker out
- * of concurrency management and the next code block will chain
- * execution of the pending work items.
- */
- if (unlikely(cpu_intensive))
- worker_set_flags(worker, WORKER_CPU_INTENSIVE);
- // (8.4) 在 UNBOUND 或者 CPU_INTENSIVE work 中判断是否需要唤醒 idle worker
- // 普通 work 不会执行这个操作
- /*
- * Wake up another worker if necessary. The condition is always
- * false for normal per-cpu workers since nr_running would always
- * be >= 1 at this point. This is used to chain execution of the
- * pending work items for WORKER_NOT_RUNNING workers such as the
- * UNBOUND and CPU_INTENSIVE ones.
- */
- if (need_more_worker(pool))
- wake_up_worker(pool);
- /*
- * Record the last pool and clear PENDING which should be the last
- * update to @work. Also, do this inside @pool->lock so that
- * PENDING and queued state changes happen together while IRQ is
- * disabled.
- */
- set_work_pool_and_clear_pending(work, pool->id);
- spin_unlock_irq(&pool->lock);
- lock_map_acquire_read(&pwq->wq->lockdep_map);
- lock_map_acquire(&lockdep_map);
- trace_workqueue_execute_start(work);
- // (8.5) 执行 work 函数
- worker->current_func(work);
- /*
- * While we must be careful to not use "work" after this, the trace
- * point will only record its address.
- */
- trace_workqueue_execute_end(work);
- lock_map_release(&lockdep_map);
- lock_map_release(&pwq->wq->lockdep_map);
- if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
- pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
- " last function: %pf\n",
- current->comm, preempt_count(), task_pid_nr(current),
- worker->current_func);
- debug_show_held_locks(current);
- dump_stack();
- }
- /*
- * The following prevents a kworker from hogging CPU on !PREEMPT
- * kernels, where a requeueing work item waiting for something to
- * happen could deadlock with stop_machine as such work item could
- * indefinitely requeue itself while all other CPUs are trapped in
- * stop_machine. At the same time, report a quiescent RCU state so
- * the same condition doesn't freeze RCU.
- */
- cond_resched_rcu_qs();
-
- spin_lock_irq(&pool->lock);
-
- /* clear cpu intensive status */
- if (unlikely(cpu_intensive))
- worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
-
- /* we're done with it, release */
- hash_del(&worker->hentry);
- worker->current_work = NULL;
- worker->current_func = NULL;
- worker->current_pwq = NULL;
- worker->desc_valid = false;
- pwq_dec_nr_in_flight(pwq, work_color);
- }
worker_pool 怎么来动态增减 worker,这部分的算法是 CMWQ 的核心。其思想如下:
workqueue就是存放一组work的集合,基本可以分为两类:一类是系统创建的workqueue,一类是用户自己创建的workqueue。不论是系统还是用户的workqueue,如果没有指定WQ_UNBOUND,默认都是和normal worker_pool绑定。
系统在初始化时创建了一批默认的workqueue:system_wq、system_highpri_wq、system_unbound_wq、system_freezable_wq、system_power_efficient_wq、system_freezable_power_efficient_wq。
像system_wq,就是schedule_work()默认使用的。
kernel/workqueue.c:init_workqueues()
- static int __init init_workqueues(void)
- {
- system_wq = alloc_workqueue("events", 0, 0);
- system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
- system_long_wq = alloc_workqueue("events_long", 0, 0);
- system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
- WQ_UNBOUND_MAX_ACTIVE);
- system_freezable_wq = alloc_workqueue("events_freezable",
- WQ_FREEZABLE, 0);
- system_power_efficient_wq = alloc_workqueue("events_power_efficient",
- WQ_POWER_EFFICIENT, 0);
-
- system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
- WQ_FREEZABLE | WQ_POWER_EFFICIENT,
- 0);
- }
详细过程见上几节的代码分析:alloc_workqueue() -> __alloc_workqueue_key() -> alloc_and_link_pwqs()。
将work压入到workqueue当中。
kernel/workqueue.c: queue_work() -> queue_work_on() -> __queue_work()
- static void __queue_work(int cpu, struct workqueue_struct *wq,
- struct work_struct *work)
- {
- struct pool_workqueue *pwq;
- struct worker_pool *last_pool;
- struct list_head *worklist;
- unsigned int work_flags;
- unsigned int req_cpu = cpu;
-
- /*
- * While a work item is PENDING && off queue, a task trying to
- * steal the PENDING will busy-loop waiting for it to either get
- * queued or lose PENDING. Grabbing PENDING and queueing should
- * happen with IRQ disabled.
- */
- WARN_ON_ONCE(!irqs_disabled());
-
- debug_work_activate(work);
-
- /* if draining, only works from the same workqueue are allowed */
- if (unlikely(wq->flags & __WQ_DRAINING) &&
- WARN_ON_ONCE(!is_chained_work(wq)))
- return;
- retry:
- // (1) 如果没有指定 cpu,则使用当前 cpu
- if (req_cpu == WORK_CPU_UNBOUND)
- cpu = raw_smp_processor_id();
-
- /* pwq which will be used unless @work is executing elsewhere */
- if (!(wq->flags & WQ_UNBOUND))
- // (2) 对于 normal wq,使用当前 cpu 对应的 normal worker_pool
- pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
- else
- // (3) 对于 unbound wq,使用当前 cpu 对应 node 的 worker_pool
- pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
-
- // (4) 如果 work 在其他 worker 上正在被执行,把 work 压到对应的 worker 上去
- // 避免 work 出现重入的问题
- /*
- * If @work was previously on a different pool, it might still be
- * running there, in which case the work needs to be queued on that
- * pool to guarantee non-reentrancy.
- */
- last_pool = get_work_pool(work);
- if (last_pool && last_pool != pwq->pool) {
- struct worker *worker;
-
- spin_lock(&last_pool->lock);
-
- worker = find_worker_executing_work(last_pool, work);
-
- if (worker && worker->current_pwq->wq == wq) {
- pwq = worker->current_pwq;
- } else {
- /* meh... not running there, queue here */
- spin_unlock(&last_pool->lock);
- spin_lock(&pwq->pool->lock);
- }
- } else {
- spin_lock(&pwq->pool->lock);
- }
-
- /*
- * pwq is determined and locked. For unbound pools, we could have
- * raced with pwq release and it could already be dead. If its
- * refcnt is zero, repeat pwq selection. Note that pwqs never die
- * without another pwq replacing it in the numa_pwq_tbl or while
- * work items are executing on it, so the retrying is guaranteed to
- * make forward-progress.
- */
- if (unlikely(!pwq->refcnt)) {
- if (wq->flags & WQ_UNBOUND) {
- spin_unlock(&pwq->pool->lock);
- cpu_relax();
- goto retry;
- }
- /* oops */
- WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
- wq->name, cpu);
- }
-
- /* pwq determined, queue */
- trace_workqueue_queue_work(req_cpu, pwq, work);
-
- if (WARN_ON(!list_empty(&work->entry))) {
- spin_unlock(&pwq->pool->lock);
- return;
- }
-
- pwq->nr_in_flight[pwq->work_color]++;
- work_flags = work_color_to_flags(pwq->work_color);
-
- // (5) 如果还没有达到 max_active,将 work 挂载到 pool->worklist
- if (likely(pwq->nr_active < pwq->max_active)) {
- trace_workqueue_activate_work(work);
- pwq->nr_active++;
- worklist = &pwq->pool->worklist;
- // 否则,将 work 挂载到临时队列 pwq->delayed_works
- } else {
- work_flags |= WORK_STRUCT_DELAYED;
- worklist = &pwq->delayed_works;
- }
-
- // (6) 将 work 压入 worklist 当中
- insert_work(pwq, work, worklist, work_flags);
-
- spin_unlock(&pwq->pool->lock);
- }
flush某个work,确保work执行完成。
怎么判断异步的work已经执行完成?这里面使用了一个技巧:在目标work后面插入一个新的work wq_barrier,如果wq_barrier执行完成,那么目标work肯定已经执行完成。
kernel/workqueue.c: queue_work() -> queue_work_on() -> __queue_work()
- /**
- * flush_work - wait for a work to finish executing the last queueing instance
- * @work: the work to flush
- *
- * Wait until @work has finished execution. @work is guaranteed to be idle
- * on return if it hasn't been requeued since flush started.
- *
- * Return:
- * %true if flush_work() waited for the work to finish execution,
- * %false if it was already idle.
- */
- bool flush_work(struct work_struct *work)
- {
- struct wq_barrier barr;
-
- lock_map_acquire(&work->lockdep_map);
- lock_map_release(&work->lockdep_map);
-
- if (start_flush_work(work, &barr)) {
- // 等待 barr work 执行完成的信号
- wait_for_completion(&barr.done);
- destroy_work_on_stack(&barr.work);
- return true;
- } else {
- return false;
- }
- }
- | →
- static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
- {
- struct worker *worker = NULL;
- struct worker_pool *pool;
- struct pool_workqueue *pwq;
-
- might_sleep();
-
- // (1) 如果 work 所在 worker_pool 为 NULL,说明 work 已经执行完
- local_irq_disable();
- pool = get_work_pool(work);
- if (!pool) {
- local_irq_enable();
- return false;
- }
-
- spin_lock(&pool->lock);
- /* see the comment in try_to_grab_pending() with the same code */
- pwq = get_work_pwq(work);
- if (pwq) {
- // (2) 如果 work 所在 pwq 指向的 worker_pool 不等于上一步得到的 worker_pool,说明 work 已经执行完
- if (unlikely(pwq->pool != pool))
- goto already_gone;
- } else {
- // (3) 如果 work 所在 pwq 为 NULL,并且也没有在当前执行的 work 中,说明 work 已经执行完
- worker = find_worker_executing_work(pool, work);
- if (!worker)
- goto already_gone;
- pwq = worker->current_pwq;
- }
-
- // (4) 如果 work 没有执行完,向 work 的后面插入 barr work
- insert_wq_barrier(pwq, barr, work, worker);
- spin_unlock_irq(&pool->lock);
-
- /*
- * If @max_active is 1 or rescuer is in use, flushing another work
- * item on the same workqueue may lead to deadlock. Make sure the
- * flusher is not running on the same workqueue by verifying write
- * access.
- */
- if (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)
- lock_map_acquire(&pwq->wq->lockdep_map);
- else
- lock_map_acquire_read(&pwq->wq->lockdep_map);
- lock_map_release(&pwq->wq->lockdep_map);
-
- return true;
- already_gone:
- spin_unlock_irq(&pool->lock);
- return false;
- }
- || →
- static void insert_wq_barrier(struct pool_workqueue *pwq,
- struct wq_barrier *barr,
- struct work_struct *target, struct worker *worker)
- {
- struct list_head *head;
- unsigned int linked = 0;
-
- /*
- * debugobject calls are safe here even with pool->lock locked
- * as we know for sure that this will not trigger any of the
- * checks and call back into the fixup functions where we
- * might deadlock.
- */
- // (4.1) barr work 的执行函数 wq_barrier_func()
- INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
- __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
- init_completion(&barr->done);
-
- /*
- * If @target is currently being executed, schedule the
- * barrier to the worker; otherwise, put it after @target.
- */
- // (4.2) 如果 work 当前在 worker 中执行,则 barr work 插入 scheduled 队列
- if (worker)
- head = worker->scheduled.next;
- // 否则,则 barr work 插入正常的 worklist 队列中,插入位置在目标 work 后面
- // 并且置上 WORK_STRUCT_LINKED 标志
- else {
- unsigned long *bits = work_data_bits(target);
-
- head = target->entry.next;
- /* there can already be other linked works, inherit and set */
- linked = *bits & WORK_STRUCT_LINKED;
- __set_bit(WORK_STRUCT_LINKED_BIT, bits);
- }
-
- debug_work_activate(&barr->work);
- insert_work(pwq, &barr->work, head,
- work_color_to_flags(WORK_NO_COLOR) | linked);
- }
- ||| →
- static void wq_barrier_func(struct work_struct *work)
- {
- struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
- // (4.1.1) barr work 执行完成,发出 complete 信号。
- complete(&barr->done);
- }
CMWQ 实现的 workqueue 机制,被包装成相应的对外接口函数。
把work压入系统默认wq system_wq,WORK_CPU_UNBOUND指定worker为当前CPU绑定的normal work_pool创建的worker。
kernel/workqueue.c: schedule_work() -> queue_work_on() -> __queue_work()
- static inline bool schedule_work(struct work_struct *work)
- {
- return queue_work(system_wq, work);
- }
- | →
- static inline bool queue_work(struct workqueue_struct *wq,
- struct work_struct *work)
- {
- return queue_work_on(WORK_CPU_UNBOUND, wq, work);
- }
在schedule_work()基础上,可以指定work运行的CPU。
kernel/workqueue.c: schedule_work_on() -> queue_work_on() -> __queue_work()
- static inline bool schedule_work_on(int cpu, struct work_struct *work)
- {
- return queue_work_on(cpu, system_wq, work);
- }
启动一个timer,在timer定时到了以后调用delayed_work_timer_fn()把work压入系统默认wq system_wq。
kernel/workqueue.c: schedule_work_on() -> queue_work_on() -> __queue_work()
- static inline bool schedule_delayed_work(struct delayed_work *dwork,
- unsigned long delay)
- {
- return queue_delayed_work(system_wq, dwork, delay);
- }
- | →
- static inline bool queue_delayed_work(struct workqueue_struct *wq,
- struct delayed_work *dwork,
- unsigned long delay)
- {
- return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
- }
- || →
- bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
- struct delayed_work *dwork, unsigned long delay)
- {
- struct work_struct *work = &dwork->work;
- bool ret = false;
- unsigned long flags;
-
- /* read the comment in __queue_work() */
- local_irq_save(flags);
-
- if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
- __queue_delayed_work(cpu, wq, dwork, delay);
- ret = true;
- }
-
- local_irq_restore(flags);
- return ret;
- }
- ||| →
- static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
- struct delayed_work *dwork, unsigned long delay)
- {
- struct timer_list *timer = &dwork->timer;
- struct work_struct *work = &dwork->work;
-
- WARN_ON_ONCE(timer->function != delayed_work_timer_fn ||
- timer->data != (unsigned long)dwork);
- WARN_ON_ONCE(timer_pending(timer));
- WARN_ON_ONCE(!list_empty(&work->entry));
-
- /*
- * If @delay is 0, queue @dwork->work immediately. This is for
- * both optimization and correctness. The earliest @timer can
- * expire is on the closest next tick and delayed_work users depend
- * on that there's no such delay when @delay is 0.
- */
- if (!delay) {
- __queue_work(cpu, wq, &dwork->work);
- return;
- }
- timer_stats_timer_set_start_info(&dwork->timer);
- dwork->wq = wq;
- dwork->cpu = cpu;
- timer->expires = jiffies + delay;
- if (unlikely(cpu != WORK_CPU_UNBOUND))
- add_timer_on(timer, cpu);
- else
- add_timer(timer);
- }
- |||| →
- void delayed_work_timer_fn(unsigned long __data)
- {
- struct delayed_work *dwork = (struct delayed_work *)__data;
- /* should have been called from irqsafe timer with irq already off */
- __queue_work(dwork->cpu, dwork->wq, &dwork->work);
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。