当前位置:   article > 正文

Bluedroid线程封装和reactor原理_bluedroid 线程

bluedroid 线程

Bluedroid线程封装和reactor原理

ps:看这篇之前,如果对bluedroid底层数据收发需要进一步了解的,可以看这里:
蓝牙重启case之:hardware error
Bluedroid协议栈BTU线程处理HCI数据流程分析

Bluedroid线程和原始的posix线程有点不一样. posix线程中, 一般使用pthread_create创建一个线程, 然后在线程函数中跑一个while死循环, 处理各自的业务逻辑.传统上, 每个线程处理函数都需要单独编写,并且各不相同.

但在bluedroid中,由于数据处理都是面向消息队列的(基于简单的链表队列,与XSI IPC的消息队列无关,以下都称消息队列). 通过对pthread_create的合理封装后, 并对消息队列使用reactor反射机制,外层应用可以很方便的考虑业务逻辑,而无需关心和实现正常的线程逻辑处理.

一 BlueDroid线程封装

1. 新建一个线程

新建一个线程时(例如hci_thread),建立一个线程内部的消息队列,并且和eventfd 信号量形式的文件描述符关联,然后在线程内部循环等待eventfd信号量, 等到信号量之后, 执行内部的消息队列回调函数, 这个回调函数执行用户自定义的消息队列处理函数.调用完成后, 继续等待eventfd信号量.

thread_t *thread_new_sized(const char *name, size_t work_queue_capacity) {
...
  thread_t *ret = osi_calloc(sizeof(thread_t));
a
  ret->reactor = reactor_new();//建立线程内部用的反射器, 创建epoll_fd和event_fd.
    //并把event_fd加入到epoll_fd监测列表中, event_fd主要是停止线程时使用.
...
  ret->work_queue = fixed_queue_new(work_queue_capacity);//线程内部工作队列
...
  // Start is on the stack, but we use a semaphore, so it's safe
  struct start_arg start;
  start.start_sem = semaphore_new(0);
...
  strncpy(ret->name, name, THREAD_NAME_MAX);
  start.thread = ret;
  start.error = 0;
  pthread_create(&ret->pthread, NULL, run_thread, &start);
  semaphore_wait(start.start_sem);
  semaphore_free(start.start_sem);
...
  return ret;
...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

2. 线程的运行

我们看run_thread线程函数重要func:

  1. 获取线程内部work_queue的dequeue信号量fd, 并在reactor_register函数中, 注册这个fd到epoll集合中.内部work_queue的回调(反射)函数是work_queue_read_cb.
  2. 执行reactor_start(thread->reactor),进入死循环,等待fd信号量. 我们在下一节详细分析反射器reactor的注册(reactor_register)和运行(reactor_start).
static void *run_thread(void *start_arg) {
  ...
  struct start_arg *start = start_arg;
  thread_t *thread = start->thread;
  ...
  if (prctl(PR_SET_NAME, (unsigned long)thread->name) == -1) {
  ...
  }
  thread->tid = gettid();//线程tid.不是posix线程id.

  LOG_WARN(LOG_TAG, "%s: thread id %d, thread name %s started", __func__, thread->tid, thread->name);

  semaphore_post(start->start_sem);//通知线程已经创建,父线程可以继续往下跑了.

  int fd = fixed_queue_get_dequeue_fd(thread->work_queue);//获取线程内部work_queue的dequeue信号量fd描述符
    //后面的循环中,就是使用多路IO来侦听work_queue的dequeue信号量,并执行用户的消息处理函数.`reactor_register`
  void *context = thread->work_queue;

  reactor_object_t *work_queue_object = reactor_register(thread->reactor, fd, context, work_queue_read_cb, NULL);//work_queue_read_cb是线程内部的消息队列处理函数.也就是线程内部反射器回调函数.
  reactor_start(thread->reactor);
  reactor_unregister(work_queue_object);
  ...
  LOG_WARN(LOG_TAG, "%s: thread id %d, thread name %s exited", __func__, thread->tid, thread->name);
  return NULL;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

二 反射器的封装

2.1反射器的注册

reactor_register函数: 把fd文件描述符注册给反射器, 并加入到epoll信号集. 当fd文件(信号量)有内容时, 调用read_ready或者write_ready函数(bluedroid7.0暂时不使用write_ready). 下一节我们说明反射器是如何运行并且回调read_ready回调函数的.

reactor_object_t *reactor_register(reactor_t *reactor,
    int fd, void *context,
    void (*read_ready)(void *context),
    void (*write_ready)(void *context)) {
...
  reactor_object_t *object =
      (reactor_object_t *)osi_calloc(sizeof(reactor_object_t));
  //以下初始化reactor_object_t对象.
  object->reactor = reactor;
  object->fd = fd;//反射器关联的fd文件
  object->context = context;//其实就是thread内部的work_queue
  object->read_ready = read_ready;//这个是内部消息RX回调函数, 也就是线程内部反射器回调函数.
  object->write_ready = write_ready;
  pthread_mutex_init(&object->lock, NULL);

  struct epoll_event event;
  memset(&event, 0, sizeof(event));
  if (read_ready)
    event.events |= (EPOLLIN | EPOLLRDHUP);
  if (write_ready)
    event.events |= EPOLLOUT;
  event.data.ptr = object;

  if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {//加到epoll集合中
...
    return NULL;
  }

  return object;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

2.2 反射器的运行

开始运行, run_reactor的iterations参数是0, 0代表死循环运行:

reactor_status_t reactor_start(reactor_t *reactor) {
  assert(reactor != NULL);
  return run_reactor(reactor, 0);//运行reactor.这里iterations参数是0,
}
  • 1
  • 2
  • 3
  • 4

反射器运行函数:

// Runs the reactor loop for a maximum of |iterations|.
// 0 |iterations| means loop forever.
// |reactor| may not be NULL.
static reactor_status_t run_reactor(reactor_t *reactor, int iterations) {
...
  reactor->run_thread = pthread_self();
  reactor->is_running = true;

  struct epoll_event events[MAX_EVENTS];
  for (int i = 0; iterations == 0 || i < iterations; ++i) {//假设iterations等于0, 就是一个死循环.所以,大的线程任务都是死循环,比如hci_thread.
...
    int ret;
    OSI_NO_INTR(ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1));//等待信号量.
    if (ret == -1) {
...
      return REACTOR_STATUS_ERROR;
    }

    for (int j = 0; j < ret; ++j) {
      // The event file descriptor is the only one that registers with
      // a NULL data pointer. We use the NULL to identify it and break
      // out of the reactor loop.
      if (events[j].data.ptr == NULL) {//这种情况是收到event_fd事件,表示要停止这个反射器的运行了.
        eventfd_t value;
        eventfd_read(reactor->event_fd, &value);
        reactor->is_running = false;
        return REACTOR_STATUS_STOP;
      }
...
    reactor_object_t *object = (reactor_object_t *)events[j].data.ptr;//这个表示收到正常的reactor_object对象.
...
      if (list_contains(reactor->invalidation_list, object)) {
        pthread_mutex_unlock(&reactor->list_lock);
        continue;
      }
...
      if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready)
        object->read_ready(object->context);//调用reactor_object的read回调函数.即调用反射器回调函数.
      if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready)
        object->write_ready(object->context);
      pthread_mutex_unlock(&object->lock);
...
    }
  }

  reactor->is_running = false;
  return REACTOR_STATUS_DONE;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

消息队列每次被写入数据的同时, 同时给消息队列加一个eventfd对应的信号量(文件信号量), 上面如下这行函数,就是读文件(信号量)有内容了,就处理数据. 正常的时候是EPOLLIN. 信号量结束时,是EPOLLHUP.

      if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready)
        object->read_ready(object->context);//调用reactor_object的read回调函数.
  • 1
  • 2

我们再看一下线程内部的reactor_object对象read_ready是如何运行的. 这个reactor_object对象的read_ready函数实际是work_queue_read_cb函数, 我们看一下这个实现,

static void work_queue_read_cb(void *context) {
  assert(context != NULL);

  fixed_queue_t *queue = (fixed_queue_t *)context;//得到fixed_queue_t类型队列.
  work_item_t *item = fixed_queue_dequeue(queue);//取出一个work_item_t类型成员
  item->func(item->context);
  osi_free(item);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

以上, 最终执行的是线程内部的fixed_queue_t类型的队列, 即thread_t类型的work_queue成员.这个work_queue_read_cb取出一个work_item_t类型成员,执行成员本身的item->func函数,参数是(item->context).

下一节说明fixed_queue_t的使用.

三 fixed_queue_t的处理

3.1 线程任务的入队列和处理

void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
......
  semaphore_wait(queue->enqueue_sem);

  pthread_mutex_lock(&queue->lock);
  list_append(queue->list, data); //入队列
  pthread_mutex_unlock(&queue->lock);

  semaphore_post(queue->dequeue_sem); //给信号量,让run_reactor等待信号量.
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

说明如上注释.

我们以thread_post函数为例:

bool thread_post(thread_t *thread, thread_fn func, void *context) {
  assert(thread != NULL);
  assert(func != NULL);

  // TODO(sharvil): if the current thread == |thread| and we've run out
  // of queue space, we should abort this operation, otherwise we'll
  // deadlock.

  // Queue item is freed either when the queue itself is destroyed
  // or when the item is removed from the queue for dispatch.
  work_item_t *item = (work_item_t *)osi_malloc(sizeof(work_item_t));
  item->func = func;
  item->context = context;
  fixed_queue_enqueue(thread->work_queue, item);
  return true;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

以上代码中, 把一个新的任务加到线程去执行, 任务被注册在work_item_t的func里,并入线程内部队列. 出队列的时候, 就如2.2节所示了.

以下是hci_layer.c中的实例调用:

thread_post(thread, event_finish_startup, NULL);
  • 1

3.2 普通队列的数据处理

这节没时间可以不看.

这里,我们以hci_layer.ccommand_queue为例.

入队列如下:

static void transmit_command(
    BT_HDR *command,
    command_complete_cb complete_callback,
    command_status_cb status_callback,
    void *context) {
  waiting_command_t *wait_entry = osi_calloc(sizeof(waiting_command_t));

  uint8_t *stream = command->data + command->offset;
  STREAM_TO_UINT16(wait_entry->opcode, stream);
  wait_entry->complete_callback = complete_callback;//这个是host发送后并收到controler返回发送完成的回调.
  wait_entry->status_callback = status_callback;//这个是host发送后并收到controler返回的控制器状态的回调.
  wait_entry->command = command;
  wait_entry->context = context;

  // Store the command message type in the event field
  // in case the upper layer didn't already
  command->event = MSG_STACK_TO_HC_HCI_CMD;

  fixed_queue_enqueue(command_queue, wait_entry);//这里入队列了.
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

以上代码对于理解反射器可以忽略.下面才是比较关键的,同样以command_queue为例.

这里是关联反射器的回调函数设定, 这里关联了event_command_ready函数.

fixed_queue_register_dequeue(command_queue, thread_get_reactor(thread), event_command_ready, NULL);
  • 1

看具体的fixed_queue_register_dequeue函数实现.

void fixed_queue_register_dequeue(fixed_queue_t *queue, reactor_t *reactor, fixed_queue_cb ready_cb, void *context) {
  assert(queue != NULL);
  assert(reactor != NULL);
  assert(ready_cb != NULL);

  // Make sure we're not already registered
  fixed_queue_unregister_dequeue(queue);

  queue->dequeue_ready = ready_cb;
  queue->dequeue_context = context;
  queue->dequeue_object = reactor_register(
    reactor,
    fixed_queue_get_dequeue_fd(queue),
    queue,
    internal_dequeue_ready,//反射器自身回调函数.
    NULL
  );
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

看internal_dequeue_ready实现:

static void internal_dequeue_ready(void *context) {
  assert(context != NULL);

  fixed_queue_t *queue = context;
  queue->dequeue_ready(queue, queue->dequeue_context);//反射器调用用户队列的回调函数.这里就是event_command_ready.
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

command_queue的dequeue_ready是event_command_ready函数. 这个函数可以很清晰的看到, 最后通过fragment_and_dispatch函数发射出去了.

// Command/packet transmitting functions
static void event_command_ready(fixed_queue_t *queue, UNUSED_ATTR void *context) {
  if (command_credits > 0) {
    waiting_command_t *wait_entry = fixed_queue_dequeue(queue);
    command_credits--;

    // Move it to the list of commands awaiting response
    pthread_mutex_lock(&commands_pending_response_lock);
    list_append(commands_pending_response, wait_entry);
    pthread_mutex_unlock(&commands_pending_response_lock);

    // Send it off
    low_power_manager->wake_assert();
    packet_fragmenter->fragment_and_dispatch(wait_entry->command);
    low_power_manager->transmit_done();

    update_command_response_timer();
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

由上,可以得知,往bluedroid封装过的普通队列发数据,对应的反射器就会自动解数据,并且调用发数据之前设定的回调函数.

四 小结

本文分析了Bluedroid线程封装和reactor原理, 最后分别以线程任务和普通队列任务为例,解释了线程和反射器是如何运行起来的.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/820285
推荐阅读
  

闽ICP备14008679号