当前位置:   article > 正文

探索 Flutter 异步消息的实现

flutter 异步发送msg消息处理

本文作者:赵旭阳

一、简介

我们在进行 Android 开发的时候,会通过创建一个 Handler 并调用其 sendMessage  或 Post 方法来进行异步消息调用,其背后涉及到了三个面试经常被问的类:Handler,Looper,MessageQueue,内部原理我想做过 Android 开发的基本都了解。

  1. Event queue : 包含了所有的外部事件:I/O,鼠标点击,绘制,定时器,Dart isolate 的消息等,其实这块又根据消息的优先级细分成了两个队列,后面会有介绍。

  2. Microtask queue :事件处理代码有时需要在当前 event 之后,且在下一个 event 之前做一些任务。

  • 两种队列的消息处理流程大致如图所示:

640?wx_fmt=png


dart 提供了 dart:async 库来对这两个队列进行操作,主要是如下两个API:

  1. Future 类,创建一个定时器事件到 Event queue 尾部。

  2. scheduleMicrotask() 方法,添加一个事件到 Microtask queue 尾部

下面将会分析这两个 API 背后的实现,涉及的是 flutter engine 的源码,可以参考官方的 wiki 来下载:

二、初始化

1.  创建 MessageLoop

  • 在启动 Flutter 的时候,引擎会额外创建三个线程:UI Thread,IO Thread, GPU Thread,并为每个线程都创建一个 MessageLoop,之后各个线程就进入的消息循环的状态,等待新的消息来处理,具体流程如下:

  1. ThreadHost thread_host_; 
  2. AndroidShellHolder::AndroidShellHolder(
  3.     flutter::Settings settings,
  4.     fml::jni::JavaObjectWeakGlobalRef java_object,
  5.     bool is_background_view)
  6.     : settings_(std::move(settings)), java_object_(java_object) {
  7. ......
  8.   if (is_background_view) {
  9.     thread_host_ = {thread_label, ThreadHost::Type::UI};
  10.   } else {
  11.     // 创建三个线程
  12.     thread_host_ = {thread_label, ThreadHost::Type::UI | ThreadHost::Type::GPU |
  13.                                       ThreadHost::Type::IO};
  14.   }
  15. ......
  16. }
  • 进一步分析 ThreadHost 的创建,最后来到了创建 Thread,每个 Thread 都会创建一个 MessageLoop 并获取其 TaskRunner,TaskRunner 是用来外部向 MessageLoop 中 Post Task 的。顺带说一下,在 Android 上 MessageLoop 的实现还是使用的系统自身的 Looper 机制,这里是通过 NDK 的 ALooper 相关接口来实现的。具体代码如下:

  1. Thread::Thread(const std::string& name) : joined_(false) {
  2.   fml::AutoResetWaitableEvent latch;
  3.   fml::RefPtr<fml::TaskRunner> runner;
  4.   thread_ = std::make_unique<std::thread>([&latch, &runner, name]() -> void {
  5.     SetCurrentThreadName(name);
  6.     // 创建 MessageLoop
  7.     fml::MessageLoop::EnsureInitializedForCurrentThread();
  8.     auto& loop = MessageLoop::GetCurrent();
  9.     // 获取 TaskRunner
  10.     runner = loop.GetTaskRunner();
  11.     latch.Signal();
  12.     loop.Run();
  13.   });
  14.   latch.Wait();
  15.   task_runner_ = runner;
  16. }
  • MessageLoop 创建好后,我们就可以通过 TaskRunner 向其发送 Task 了,这里需要注意 MessageLoop 执行的 Task 仅是一个 无参的闭包 。类似这样:

  1. auto jni_exit_task([key = thread_destruct_key_]() {
  2.   FML_CHECK(pthread_setspecific(key, reinterpret_cast<void*>(1)) == 0);
  3. });
  4. thread_host_.ui_thread->GetTaskRunner()->PostTask(jni_exit_task);

2.  创建 Root Isolate

在 dart 语言中是没有线程的,而是使用类似于线程但相互之间不共享堆内存的 isolate,代表一个独立的 dart 程序执行环境。同样 Flutter 的 dart 代码是运行在一个叫 root isolate 的 isolate 中,下面简要列下 root isolate 的创建过程。

a.启动 dart vm

这个步骤的具体流程,大家可以顺着 /engine-v1.5.4/src/flutter/runtime/dart_vm.cc 中的 DartVM 构造方法去跟进分析。在 dart vm 启动过程中会创建 vm isolate 和 PortMap,这两个的具体作用下面有介绍。

b.创建 root isolate

root isolate 是在 UI 线程中创建的,具体流程见 /src/flutter/runtime/dart_isolate.cc 的 CreateRootIsolate 方法。由于 isolate 是对当前线程执行环境的一个抽象表示,所以其内部存储了很多信息,对于异步消息这块有四个关键的信息是需要注意的。

下面三个字段是定义在 dart vm 层的 Isolate 类中,具体见 /src/third_party/dart/runtime/vm/isolate.h。

  • main_port :可以看做该 isolate 的标识,是一个整数;

  • message_handler :顾名思义,用来管理每个 isolate 的 Event queue,其内部根据 message 的优先级将消息分为了两个队列:普通优先级的 queue 和 OOB 优先级的 oob_queue。了解 TCP 协议的应该了解 TCP 中有带外数据(即优先数据),isolate 的 OOB 优先级也是类似的意思,OOB 优先级的消息会被优先处理,目前看到有这么几种 OOB 消息:

  1. // 这些主要和 isolate 的生命周期相关
  2. kPauseMsg = 1,
  3. kResumeMsg = 2,
  4. kPingMsg = 3,
  5. kKillMsg = 4,
  6. kAddExitMsg = 5,
  7. kDelExitMsg = 6,
  8. kAddErrorMsg = 7,
  9. kDelErrorMsg = 8,
  10. kErrorFatalMsg = 9,
  11. // 可以注意下 kLowMemoryMsg ,如果有大量 OOB 怀疑是内存不够了
  12. kInterruptMsg = 10,     // Break in the debugger.
  13. kInternalKillMsg = 11,  // Like kill, but does not run exit listeners, etc.
  14. kLowMemoryMsg = 12,     // Run compactor, etc.
  15. kDrainServiceExtensionsMsg = 13,  // Invoke pending service extensions
  • message_notify_callback :message_handler 收到消息后会调用该变量指向的函数去处理;

Flutter 引擎层会对 dart vm 的 isolate 实例做一层包装( DartIsolate 类),其内部定义了:

  • microtask_queue: 用来存储 microtask 消息,可见在 Flutter 引擎中,Microtask queue 并不是由 dart vm 层来管理的

    前面已经说过 isolate 之间是不能直接互相访问,如图:

640?wx_fmt=png

可以看出 isolate 之间是共享 vm isolate 的堆内存区域的,有点类似于操作系统的内核空间,vm isolate 的堆内存存储了 dart vm 内部的核心数据(内置类,内置对象)。除了 vm isolate,不同 isolate 之间的堆内存是不能直接访问的,为此 dart vm 提供了 isolate 之间的通信机制,负责通信路由的大管家就是 PortMap,其内部实现就是一张 Hash 表,Key 为 isolate 的 main_port,Value 为 isolate 的 message_handler。

三、创建 Future

使用 dart 开发一定会用到 Future,当我们通过 Future 构造方法创建一个实例的时候,就会创建一个定时器消息到 Event queue,下面我们将分析这个流程。整体架构图:

640?wx_fmt=png

1.  dart 层创建 Future

创建 Future 的时候,内部会通过 Timer 构造一个定时器,具体代码如下:

/src/out/host_release/dart-sdk/lib/async/future.dart

  1. factory Future(FutureOr<T> computation()) {
  2.   _Future<T> result = new _Future<T>();
  3.   Timer.run(() {
  4.     try {
  5.       result._complete(computation());
  6.     } catch (e, s) {
  7.       _completeWithErrorCallback(result, e, s);
  8.     }
  9.   });
  10.   return result;
  11. }

跟进 Timer 的实现,具体代码如下:

/src/out/host_release/dart-sdk/lib/async/timer.dart

  1. static void run(void callback()) {
  2.   new Timer(Duration.zero, callback);
  3. }
  4. factory Timer(Duration duration, void callback()) {
  5.   if (Zone.current == Zone.root) {
  6.     return Zone.current.createTimer(duration, callback);
  7.   }
  8. ......
  9. }

这里假定 duration == Duration.zero,Zone.current == Zone.root ,进而到 rootZone 的 createTimer 方法,里面又调用了 Timer 的 _createTimer 方法:

/src/out/host_release/dart-sdk/lib/async/zone.dart

  1. class _RootZone extends _Zone {
  2. ......
  3.   Timer createTimer(Duration duration, void f()) {
  4.     return Timer._createTimer(duration, f);
  5.   }
  6. ......
  7. }

/src/out/host_release/dart-sdk/lib/async/timer.dart

external static Timer _createTimer(Duration duration, void callback());

可以看到 _createTimer 方法是个 external 的,按照 dart 语言的规范,external 方法的实现都在对应的 patch 文件中( timer_patch.dart),内部通过 _TimerFactory._factory 来创建 Timer,具体代码如下:

/src/third_party/dart/runtime/lib/timer_patch.dart

  1. @patch
  2. class Timer {
  3. ......
  4.   @patch
  5.   static Timer _createTimer(Duration duration, void callback()) {
  6.     if (_TimerFactory._factory == null) {
  7.       _TimerFactory._factory = VMLibraryHooks.timerFactory;
  8.     }
  9. ......
  10.     int milliseconds = duration.inMilliseconds;
  11.     if (milliseconds < 0) milliseconds = 0;
  12.    // 注意此处将外部的 callback 又包了一层
  13.     return _TimerFactory._factory(milliseconds, (_) {
  14.       callback();
  15.     }, false);
  16.   }
  17. ......
  18. }

通过上面的代码,我们知道 _TimerFactory._factory = VMLibraryHooks.timerFactory, VMLibraryHooks.timerFactory 又是在 root isolate 初始化时通过调用 _setupHooks 方法设置的,具体代码如下:

/src/third_party/dart/runtime/lib/timer_impl.dart

  1. @pragma("vm:entry-point""call")
  2. _setupHooks() {
  3.   VMLibraryHooks.timerFactory = _Timer._factory;
  4. }
  5. // VMLibraryHooks.timerFactory 指向的该方法
  6. // 我们假设创建的是非 repeating 消息,并且 milliSeconds 为 0
  7. static Timer _factory(
  8.       int milliSeconds, void callback(Timer timer), bool repeating) {
  9. ......
  10.     return new _Timer(milliSeconds, callback);
  11.   }
  12. }
  13. factory _Timer(int milliSeconds, void callback(Timer timer)) {
  14.   return _createTimer(callback, milliSeconds, false);
  15. }
  16. // 创建一个 Timer 实例并调用 _enqueue 将其加入到队列
  17. static Timer _createTimer(
  18.     void callback(Timer timer), int milliSeconds, bool repeating) {
  19. ......
  20.   _Timer timer =
  21.       new _Timer._internal(callback, wakeupTime, milliSeconds, repeating);
  22.   timer._enqueue();
  23.   return timer;
  24. }
  25. _Timer._internal(
  26.     this._callbackthis._wakeupTimethis._milliSecondsthis._repeating)
  27.     : _id = _nextId();
  28. // 这里 _milliSeconds == 0,会向 ZeroTimer 队列插入消息,然后调用 _notifyZeroHandler
  29. void _enqueue() {
  30.   if (_milliSeconds == 0) {
  31.     if (_firstZeroTimer == null) {
  32.       _lastZeroTimer = this;
  33.       _firstZeroTimer = this;
  34.     } else {
  35.       _lastZeroTimer._indexOrNext = this;
  36.       _lastZeroTimer = this;
  37.     }
  38.     // Every zero timer gets its own event.
  39.     _notifyZeroHandler();
  40.   } else {
  41.    ......
  42.     // 延迟消息这里先不分析
  43.   }
  44. }

折腾了一大圈,最后只是构造了一个 _Timer 实例并把其加入到 ZeroTimer 队列中,如果是延迟消息则会加入到 TimeoutTimerHeap 中,最后调用 _notifyZeroHandler 方法, 其主要做如下操作:

  • 创建 RawReceivePort 并设置一个叫 _handleMessage 方法做为引擎层的回调方法

  • 向引擎层 Event queue 发送一个普通优先级的  _ZERO_EVENT ,引擎层处理该消息的时候会最终回调到上面设置的 _handleMessage 方法。

具体代码如下:

/src/third_party/dart/runtime/lib/timer_impl.dart

  1. static void _notifyZeroHandler() {
  2.   if (_sendPort == null) {
  3.     _createTimerHandler();
  4.   }
  5. // 底层会调到 PortMap 的 PostMessage 方法,进而唤醒消息处理,后面会分析这个流程
  6.   _sendPort.send(_ZERO_EVENT);
  7. }
  8. // 创建和引擎层通信的 RawReceivePort,并设置引擎层的回调方法 _handleMessage
  9. static void _createTimerHandler() {
  10.   assert(_receivePort == null);
  11.   assert(_sendPort == null);
  12.   _receivePort = new RawReceivePort(_handleMessage);
  13.   _sendPort = _receivePort.sendPort;
  14.   _scheduledWakeupTime = null;
  15. }

/src/third_party/dart/runtime/lib/isolate_patch.dart

  1. @patch
  2. class RawReceivePort {
  3.   @patch
  4.   factory RawReceivePort([Function handler]) {
  5.     _RawReceivePortImpl result = new _RawReceivePortImpl();
  6.     result.handler = handler;
  7.     return result;
  8.   }
  9. }
  10. // 最终将回调设置到 _RawReceivePortImpl 的 _handlerMap 中,引擎层会从这个 map 寻找消息的 handler
  11. @pragma("vm:entry-point")
  12. class _RawReceivePortImpl implements RawReceivePort {
  13.     void set handler(Function value) {
  14.       _handlerMap[this._get_id()] = value;
  15.     }
  16. }

_handleMessage 回调方法会收集 Timer 并执行,具体代码实现如下:

/src/third_party/dart/runtime/lib/timer_impl.dart

  1. static void _handleMessage(msg) {
  2.   var pendingTimers;
  3.   if (msg == _ZERO_EVENT) {
  4.     // 找到所有的待处理 Timers
  5.     pendingTimers = _queueFromZeroEvent();
  6.     assert(pendingTimers.length > 0);
  7.   } else {
  8.     ......
  9.     // 延时消息这里不分析
  10.   }
  11. // 处理Timer,即调用设置的 callback
  12.   _runTimers(pendingTimers);
  13. ......
  14. }

2.  向 Event Queue 发送消息

前面说到 RawReceiverPort 会向引擎层 Event queue 发送一个 _ZERO_EVENT  ,其内部是通过调用 PortMap 的 PostMessage 方法将消息发送到 Event queue,该方法首先会根据接收方的 port id 找到对应的 message_handler,然后将消息根据优先级保存到相应的 queue 中,最后唤醒 message_notify_callback 回调函数 ,具体代码如下:

/src/third_party/dart/runtime/vm/port.cc

  1. bool PortMap::PostMessage(Message* message, bool before_events) {
  2.   ......
  3.   intptr_t index = FindPort(message->dest_port());
  4.   ......
  5.   MessageHandler* handler = map_[index].handler;
  6.  ......
  7.   handler->PostMessage(message, before_events);
  8.   return true;
  9. }

/src/third_party/dart/runtime/vm/message_handler.cc

  1. void MessageHandler::PostMessage(Message* message, bool before_events) {
  2.   Message::Priority saved_priority;
  3.   bool task_running = true;
  4.  ......
  5.   // 根据消息优先级进入不同的队列
  6.     if (message->IsOOB()) {
  7.       oob_queue_->Enqueue(message, before_events);
  8.     } else {
  9.       queue_->Enqueue(message, before_events);
  10.     }
  11.    ......
  12. //唤醒并处理消息
  13.   MessageNotify(saved_priority);
  14. }

/src/third_party/dart/runtime/vm/isolate.cc

  1. void IsolateMessageHandler::MessageNotify(Message::Priority priority) {
  2.   if (priority >= Message::kOOBPriority) {
  3.     I->ScheduleInterrupts(Thread::kMessageInterrupt);
  4.   }
  5. // 最后调用的 message_notify_callback 所指向的函数
  6.   Dart_MessageNotifyCallback callback = I->message_notify_callback();
  7.   if (callback) {
  8.     (*callback)(Api::CastIsolate(I));
  9.   }
  10. }

3.   Event Queue 消息处理

前面消息已经发送成功并调用了消息处理唤醒的操作,下面我们需要知道 message_notify_callback 所指向的函数的实现, root isolate 在初始化时会设置该变量,具体代码如下:

/src/flutter/runtime/dart_isolate.cc

  1. bool DartIsolate::Initialize(Dart_Isolate dart_isolate, bool is_root_isolate) {
  2.   ......
  3.   // 设置 message handler 的 task runner 为 UI Task Runner
  4.   SetMessageHandlingTaskRunner(GetTaskRunners().GetUITaskRunner(),
  5.                                is_root_isolate);
  6.   ......
  7.   return true;
  8. }
  9. void DartIsolate::SetMessageHandlingTaskRunner(
  10.     fml::RefPtr<fml::TaskRunner> runner,
  11.     bool is_root_isolate) {
  12.  ......
  13.   message_handler().Initialize(
  14.       [runner](std::function<void()> task) { runner->PostTask(task); });
  15. }

进一步跟进分析发现通过 Dart_SetMessageNotifyCallback 将  root isolate 的 message_notify_callback 设置为 MessageNotifyCallback 方法,具体代码如下:

/src/third_party/tonic/dart_message_handler.cc

  1. void DartMessageHandler::Initialize(TaskDispatcher dispatcher) {
  2.   TONIC_CHECK(!task_dispatcher_ && dispatcher);
  3.   task_dispatcher_ = dispatcher;
  4.   Dart_SetMessageNotifyCallback(MessageNotifyCallback);
  5. }

MessageNotifyCallback 会在 Event queue 收到消息后执行,但其执行过程中并没有拿到 Event queue 中的消息,而是往 UI Thread 的 MessageLoop Post 了一个 Task 闭包,这个 Task 闭包会通过调用 Dart_HandleMessage 来处理 Event queue 中的消息,具体代码流程如下:

/src/third_party/tonic/dart_message_handler.cc

  1. void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {
  2.   auto dart_state = DartState::From(dest_isolate);
  3.   TONIC_CHECK(dart_state);
  4.   dart_state->message_handler().OnMessage(dart_state);
  5. }
  6. void DartMessageHandler::OnMessage(DartState* dart_state) {
  7.   auto task_dispatcher_ = dart_state->message_handler().task_dispatcher_;
  8.   // 往 ui 线程 MessageLoop Post 了一个 Task
  9.   auto weak_dart_state = dart_state->GetWeakPtr();
  10.   task_dispatcher_([weak_dart_state]() {
  11.     if (auto dart_state = weak_dart_state.lock()) {
  12.       dart_state->message_handler().OnHandleMessage(dart_state.get());
  13.     }
  14.   });
  15. }
  16. void DartMessageHandler::OnHandleMessage(DartState* dart_state) {
  17.   ......
  18.   if (Dart_IsPausedOnStart()) {
  19.     ......
  20.   } else if (Dart_IsPausedOnExit()) {
  21.    ......
  22.   } else {
  23.      // 调用 Dart_HandleMessage 方法处理消息
  24.     result = Dart_HandleMessage();
  25.    ......
  26.   }
  27.  ......
  28. }

Dart_HandleMessage 的实现很简单,只是调用 message_handler 的 HandleNextMessage 方法,具体代码实现如下:

/src/third_party/dart/runtime/vm/dart_api_impl.cc

  1. DART_EXPORT Dart_Handle Dart_HandleMessage() {
  2. ......
  3.   if (I->message_handler()->HandleNextMessage() != MessageHandler::kOK) {
  4.     return Api::NewHandle(T, T->StealStickyError());
  5.   }
  6.   return Api::Success();
  7. }

我们进一步跟进  HandleNextMessage 方法的实现,最终来到如下代码:

/src/third_party/dart/runtime/vm/message_handler.cc

  1. // 依次遍历 message_handler 的消息队列,对每个消息进程处理
  2. MessageHandler::MessageStatus MessageHandler::HandleMessages(
  3.     MonitorLocker* ml,
  4.     bool allow_normal_messages,
  5.     bool allow_multiple_normal_messages) {
  6. ......
  7.   Message* message = DequeueMessage(min_priority);
  8.   while (message != NULL) {
  9.     ......
  10.     MessageStatus status = HandleMessage(message);
  11.    ......
  12.     message = DequeueMessage(min_priority);
  13.   }
  14.   return max_status;
  15. }
  16. // 取消息的时候会优先处理 OOB Message
  17. MessageMessageHandler::DequeueMessage(Message::Priority min_priority) {
  18.   Message* message = oob_queue_->Dequeue();
  19.   if ((message == NULL) && (min_priority < Message::kOOBPriority)) {
  20.     message = queue_->Dequeue();
  21.   }
  22.   return message;
  23. }

每个消息的处理都是在 HandleMessage 方法中,该方法会根据不同的消息优先级做相应的处理,具体代码如下:

/src/third_party/dart/runtime/vm/isolate.cc

  1. MessageHandler::MessageStatus IsolateMessageHandler::HandleMessage(
  2.     Message* message) {
  3. ......
  4.   Object& msg_handler = Object::Handle(zone);
  5. // 非 OOB 消息,需要获取 dart 层的  handler 函数
  6.   if (!message->IsOOB() && (message->dest_port() != Message::kIllegalPort)) {
  7.     msg_handler = DartLibraryCalls::LookupHandler(message->dest_port());
  8.     ......
  9.   }
  10. ......
  11.   MessageStatus status = kOK;
  12.   if (message->IsOOB()) {
  13.    // 处理 OOB 消息,详细实现可自己看代码,这里不分析OOB消息
  14.    ......
  15.   } else if (message->dest_port() == Message::kIllegalPort) {
  16.     ......
  17.   } else {
  18.     ......
  19.     // 调用前面找到的 msg_handler 来处理普通消息
  20.     const Object& result =
  21.         Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));
  22.    ......
  23.   }
  24.   delete message;
  25.   return status;
  26. }

这里我们主要看普通消息的处理逻辑,首先会通过调用 DartLibraryCalls::LookupHandler 方法来从 dart 层寻找相应的 handler 函数,然后通过 DartLibraryCalls::HandleMessage 执行相应的处理函数,具体实现代码如下:

/src/third_party/dart/runtime/vm/dart_entry.cc

  1. RawObjectDartLibraryCalls::LookupHandler(Dart_Port port_id) {
  2.   Thread* thread = Thread::Current();
  3.   Zone* zone = thread->zone();
  4.   Function& function = Function::Handle(
  5.       zone, thread->isolate()->object_store()->lookup_port_handler());
  6.   const int kTypeArgsLen = 0;
  7.   const int kNumArguments = 1;
  8. // 如果没有消息处理方法,则进行查找,最终找到的是 RawReceivePortImpl 的 _lookupHandler 方法。
  9.   if (function.IsNull()) {
  10.     Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());
  11.     ASSERT(!isolate_lib.IsNull());
  12.     const String& class_name = String::Handle(
  13.         zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));
  14.     const String& function_name = String::Handle(
  15.         zone, isolate_lib.PrivateName(Symbols::_lookupHandler()));
  16.     function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,
  17.                                        kTypeArgsLen, kNumArguments,
  18.                                        Object::empty_array());
  19.     ASSERT(!function.IsNull());
  20.     thread->isolate()->object_store()->set_lookup_port_handler(function);
  21.   }
  22. // 执行消息处理函数
  23.   const Array& args = Array::Handle(zone, Array::New(kNumArguments));
  24.   args.SetAt(0Integer::Handle(zone, Integer::New(port_id)));
  25.   const Object& result =
  26.       Object::Handle(zone, DartEntry::InvokeFunction(function, args));
  27.   return result.raw();
  28. }

最终执行的是 RawReceivePortImpl 的 _lookupHandler 方法,在前面在创建 Future 的时候我们已经设置 _handleMessage 到 _handlerMap 中,_lookupHandler 方法会从 _handlerMap 中找到设置的回调方法,最后执行回调方法。具体代码如下:

/src/third_party/dart/runtime/lib/isolate_patch.dart

  1. @pragma("vm:entry-point")
  2. class _RawReceivePortImpl implements RawReceivePort {
  3.  ......
  4.   // Called from the VM to retrieve the handler for a message.
  5.   @pragma("vm:entry-point""call")
  6.   static _lookupHandler(int id) {
  7.     var result = _handlerMap[id];
  8.     return result;
  9.   }
  10. ......
  11. }

Future 的创建到这就分析完了,整个过程涉及到了 EventQueue 的消息收发。

至此,通过分析 Future 我们已经把 Event Queue 的消息处理流程了解了。

四、Microtask

1.  向  Microtask queue 发送消息

假设 Zone. current 为 rootZone, 直接看 scheduleMicrotask 方法的实现:

/src/third_party/dart/sdk/lib/async/schedule_microtask.dart

  1. void scheduleMicrotask(void callback()) {
  2.   _Zone currentZone = Zone.current;
  3.   if (identical(_rootZone, currentZone)) {
  4.     _rootScheduleMicrotask(nullnull, _rootZone, callback);
  5.     return;
  6.   }
  7. ......
  8. }

跟进 _rootScheduleMicrotask 方法的实现,最终来到 _scheduleAsyncCallback 方法,该方法做了两件事情:

  • 将传入的 callback 加入 callback 队列

  • 将 _startMicrotaskLoop 作为闭包参数调用  _AsyncRun._scheduleImmediate 方法,_startMicrotaskLoop 中会依次执行 callback 队列保存的回调。

具体代码如下:

/src/third_party/dart/sdk/lib/async/schedule_microtask.dart

  1. void _scheduleAsyncCallback(_AsyncCallback callback) {
  2.   _AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);
  3.   if (_nextCallback == null) {
  4.     _nextCallback = _lastCallback = newEntry;
  5.     if (!_isInCallbackLoop) {
  6.       _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
  7.     }
  8.   } else {
  9.     _lastCallback.next = newEntry;
  10.     _lastCallback = newEntry;
  11.   }
  12. }
  13. // 该方法被作为回调设置到引擎,会在处理所有的 Microtask 的时候执行 
  14. void _startMicrotaskLoop() {
  15.   _isInCallbackLoop = true;
  16.   try {
  17.     _microtaskLoop();
  18.   } finally {
  19.     _lastPriorityCallback = null;
  20.     _isInCallbackLoop = false;
  21.     if (_nextCallback != null) {
  22.       _AsyncRun._scheduleImmediate(_startMicrotaskLoop);
  23.     }
  24.   }
  25. }
  26. class _AsyncRun {
  27.   external static void _scheduleImmediate(void callback());
  28. }

根据前面的经验,会在对应的 patch 文件中找到 _AsyncRun._scheduleImmediate 的实现,其内部调用了 _ScheduleImmediate._closure 指向的方法。

具体代码如下:

/src/third_party/dart/runtime/lib/schedule_microtask_patch.dart

  1. @patch
  2. class _AsyncRun {
  3.   @patch
  4.   static void _scheduleImmediate(void callback()) {
  5.     if (_ScheduleImmediate._closure == null) {
  6.       throw new UnsupportedError("Microtasks are not supported");
  7.     }
  8.     _ScheduleImmediate._closure(callback);
  9.   }
  10. }
  11. // 通过该方法设置 _ScheduleImmediate._closure
  12. @pragma("vm:entry-point""call")
  13. void _setScheduleImmediateClosure(_ScheduleImmediateClosure closure) {
  14.   _ScheduleImmediate._closure = closure;
  15. }

那么 _ScheduleImmediate._closure 指向的是什么呢?我们需要找到 _setScheduleImmediateClosure 的调用方。root isolate 初始化时会执行一系列的 vm hook 调用,我们从中找到了 _setScheduleImmediateClosure 的调用,具体代码如下:

/src/flutter/lib/ui/dart_runtime_hooks.cc

  1. static void InitDartAsync(Dart_Handle builtin_library, bool is_ui_isolate) {
  2.   Dart_Handle schedule_microtask;
  3.   if (is_ui_isolate) {
  4. // 这里的 builtin_library 是 Flutter 扩展的 ui library
  5.     schedule_microtask =
  6.         GetFunction(builtin_library, "_getScheduleMicrotaskClosure");
  7.   } else {
  8.     ......
  9.   }
  10.   Dart_Handle async_library = Dart_LookupLibrary(ToDart("dart:async"));
  11.   Dart_Handle set_schedule_microtask = ToDart("_setScheduleImmediateClosure");
  12.   Dart_Handle result = Dart_Invoke(async_library, set_schedule_microtask, 1,
  13.                                    &schedule_microtask);
  14.   PropagateIfError(result);
  15. }

进一步跟进,最终找到了 _ScheduleImmediate._closure 指向的方法,是一个 native 实现的函数,具体代码如下:

/src/flutter/lib/ui/natives.dart

  1. Function _getScheduleMicrotaskClosure() => _scheduleMicrotask; 
  2. void _scheduleMicrotask(void callback()) native 'ScheduleMicrotask';

跟进 _scheduleMicrotask 的 native 实现,发现其会把传入的 _startMicrotaskLoop 方法加入到底层的Microtask queue,具体代码如下:

/src/flutter/lib/ui/dart_runtime_hooks.cc

  1. void ScheduleMicrotask(Dart_NativeArguments args) {
  2.   Dart_Handle closure = Dart_GetNativeArgument(args, 0);
  3.   UIDartState::Current()->ScheduleMicrotask(closure);
  4. }

/src/flutter/lib/ui/ui_dart_state.cc

  1. void UIDartState::ScheduleMicrotask(Dart_Handle closure) {
  2.   if (tonic::LogIfError(closure) || !Dart_IsClosure(closure)) {
  3.     return;
  4.   }
  5.   microtask_queue_.ScheduleMicrotask(closure);
  6. }

2.  Microtask queue 消息处理

前面已经将 _startMicrotaskLoop 方法加入到了 Microtask queue ,那么 Microtask queue 内的方法何时执行呢?我们通过跟进 Microtask queue  的 RunMicrotasks 方法的调用方,最终找到 Microtask queue 内方法的执行时机 FlushMicrotasksNow,具体代码如下:

/src/flutter/lib/ui/ui_dart_state.cc

  1. void UIDartState::FlushMicrotasksNow() {
  2.   microtask_queue_.RunMicrotasks();
  3. }

再跟进 FlushMicrotasksNow 方法的调用方,发现有两处调用:

  • 这里是在每一帧开始的时候去执行 Microtask

/src/flutter/lib/ui/window/window.cc

  1. void Window::BeginFrame(fml::TimePoint frameTime) {
  2. ......
  3.   UIDartState::Current()->FlushMicrotasksNow();
  4. ......
  5. }
  • 另外一处调用是通过 TaskObserve 的形式,具体代码如下:

/src/flutter/lib/ui/ui_dart_state.cc

  1. void UIDartState::AddOrRemoveTaskObserver(bool add) {
  2. ......
  3.   if (add) {
  4. // 这个 add_callback_ 是啥呢?
  5.     add_callback_(reinterpret_cast<intptr_t>(this),
  6.                   [this]() { this->FlushMicrotasksNow(); });
  7.   } else {
  8.     remove_callback_(reinterpret_cast<intptr_t>(this));
  9.   }
  10. }

跟进 add_callback_ 的赋值,这里是android的实现

/src/flutter/shell/platform/android/flutter_main.cc

  1. void FlutterMain::Init(JNIEnv* env,
  2.                        jclass clazz,
  3.                        jobject context,
  4.                        jobjectArray jargs,
  5.                        jstring bundlePath,
  6.                        jstring appStoragePath,
  7.                        jstring engineCachesPath) {
  8.  ......
  9.   settings.task_observer_add = [](intptr_t key, fml::closure callback) {
  10.     fml::MessageLoop::GetCurrent().AddTaskObserver(key, std::move(callback));
  11.   };
  12. ......
  13. }

FlushMicrotasksNow() 是作为 MessageLoop 的 TaskObserver 来执行的, TaskObserver 会在处理完task之后把该 Task 创建的 MicroTask 全部执行,也就是说在下一个 Task 运行前执行。代码如下:

/src/flutter/fml/message_loop_impl.cc

  1. void MessageLoopImpl::FlushTasks(FlushType type) {
  2. ......
  3.   for (const auto& invocation : invocations) {
  4.     invocation();
  5.     for (const auto& observer : task_observers_) {
  6.       observer.second();
  7.     }
  8.   }
  9. }

五、结束

通过前面的分析,Flutter 的异步消息处理流程还是挺复杂的,主要是代码写的比较乱,跳转层次太多,可以通过对整个流程的掌控来寻找UI 线程的优化及监控点,进而降低 UI 线程的处理时间,希望本篇文章让大家对 Flutter 的异步消息的整体处理流程有更深的理解。

推荐阅读你们吹捧的鸿蒙,只是另一个FuchsiaAndroid仿微信QQ图片裁剪互联网 HR 黑话大全,太真实了

编程·思维·职场
欢迎扫码关注

640?wx_fmt=jpeg

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/820061
推荐阅读
相关标签
  

闽ICP备14008679号