赞
踩
1.在主线程中使用:
public static final int UPDATE_NAME=2; private MyHandler mHandler; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); mHandler=new MyHandler(new WeakReference<MainActivity>(this)); } @Override protected void onResume() { super.onResume(); new Thread(new Runnable() { @Override public void run() { Log.d("ZX", "发送消息"+Thread.currentThread().getName()); mHandler.sendEmptyMessage(UPDATE_NAME); } },"thread-2").start(); } static class MyHandler extends Handler{ public MyHandler(WeakReference<MainActivity> activity) { reference=activity; } WeakReference<MainActivity> reference; @Override public void handleMessage(Message msg) { switch (msg.what) { case UPDATE_NAME: Log.d("ZX", "处理消息"+Thread.currentThread().getName()); break; default: break; } } }
说明:
执行结果:
进程号100695是主线程,名字叫做main,这个是在Activity创建的时候系统设置的名称,那又是另外一个故事了,这里暂时先不管。进程号10707是我们创建的子线程,名字叫做thread-2,这个是我们自己取的。因为在linux看来没有进程和线程的区别都是进程,唯一的区别就是进程会有独立的内存等资源,而线程没有。
2.在子线程中使用。
public static final int UPDATE_NAME=2; private TextView mTvName; private Handler mHandler; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); mTvName=(TextView) findViewById(R.id.inset_tv); findViewById(R.id.boast).setOnClickListener(new OnClickListener() { @Override public void onClick(View v) { new Thread(new Runnable() { @Override public void run() { Log.d("ZX", "发送消息"+Thread.currentThread().getName()); mHandler.sendEmptyMessage(UPDATE_NAME); } },"thread--2").start(); } }); new Thread(new Runnable() { @Override public void run() { Looper.prepare(); mHandler=new Handler(){ @Override public void dispatchMessage(Message msg) { Log.d("ZX", "处理消息"+Thread.currentThread().getName()); } }; Looper.loop(); } },"thread--1").start(); }
说明:
执行结果:
这里就实现了线程间通信,要知道他们是怎么做到的,就需要进入源码分析了。
我们还是以主线程中的使用来分析吧,毕竟用的多。这里我们要知道Activity的创建是从ActivityThread.java的main函数开始的。本文源码来自android10。
public static void main(String[] args) { Trace.traceBegin(Trace.TRACE_TAG_ACTIVITY_MANAGER, "ActivityThreadMain"); //无本文无关的代码暂时去掉 Looper.prepareMainLooper();//步骤 step 1 // Find the value for {@link #PROC_START_SEQ_IDENT} if provided on the command line. // It will be in the format "seq=114" ActivityThread thread = new ActivityThread(); thread.attach(false, startSeq); if (sMainThreadHandler == null) { sMainThreadHandler = thread.getHandler(); } // End of event ActivityThreadMain. Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER); Looper.loop();//step 2 throw new RuntimeException("Main thread loop unexpectedly exited"); }
//step 1
public static void prepareMainLooper() {
prepare(false);//step 1.1
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();//step 1.2
}
}
按顺序先看//step 1.1
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed)//step 1.1.1);//step 1.1.2
}
第一进来肯定是null,
//step 1.1.1
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);//step 1.1.1.1
mThread = Thread.currentThread();//step 1.1.1.2
}
这里就是创建了一个MessageQueue对象,获取当前线程。
MessageQueue我们稍后再看,继续看Looper类。
//step 1.1.1 执行完了,我们回到//step 1.1.2
public void set(T value) {
Thread t = Thread.currentThread();//step 1.1.2.1
ThreadLocalMap map = getMap(t);//step 1.1.2.2
if (map != null)
map.set(this, value);1.1.2.3
else
createMap(t, value);1.1.2.4
}
进入//step 1.1.2.1
/**
* Returns a reference to the currently executing thread object.
*
* @return the currently executing thread.
*/
@FastNative
public static native Thread currentThread();
这个是一个JNI方法,实现在C++,本文我们不去追踪,这里从注释就能看出是返回一个当前执行线程对象的引用。
我们继续看//step 1.1.2.2
/**
* Get the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @return the map
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
获取当前线程中的 threadlocalMap。看名字就知道是个map集合。是个容器用来存东西的。我们点进去稍微看下。它是ThreadLocal的一个静态内部类。
static class ThreadLocalMap { /** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } /** * The initial capacity -- MUST be a power of two. */ private static final int INITIAL_CAPACITY = 16; /** * The table, resized as necessary. * table.length MUST always be a power of two. */ private Entry[] table; /** * The number of entries in the table. */ private int size = 0; /** * The next size value at which to resize. */ private int threshold; // Default to 0 /** * Set the resize threshold to maintain at worst a 2/3 load factor. */ private void setThreshold(int len) { threshold = len * 2 / 3; } /** * Increment i modulo len. */ private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } /** * Decrement i modulo len. */ private static int prevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); }
看到这些字段应该很熟悉了吧,它和hashmap非常的类似。
定义了一个数组Entry[] table,存储是元素是Entry。继承至弱引用。k-v的形式存储内容。key是ThreadLocal<?>。
初始容量是16,每次增加必须是2的指数倍,当满了2/3时扩容。
由于第一次进来获取到的map肯定是空,所以执行//step 1.1.2.4
/**
* Create the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the map
*/
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);//step 1.1.2.4.1
}
这个就是创建一个ThreadLocalMap 然后设置到当前的线程中。
//step 1.1.2.4.1
/**
* Construct a new map initially containing (firstKey, firstValue).
* ThreadLocalMaps are constructed lazily, so we only create
* one when we have at least one entry to put in it.
*/
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
这个就是创建一个ThreadlocalMap。并存入第一个元素Entry。这个的firstValue就是前面创建的Looper。fristKey就是在Looper里面创建的静态常量 sThreadLocal。我们计算一下,第一个entry的下标是存在哪。
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);计算这个I。
前面
int threadLocalHashCode = nextHashCode();
int INITIAL_CAPACITY = 16;
所以后面的是16-1=15。主要是看前面的数。
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
private static AtomicInteger nextHashCode =
new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
AtomicInteger是随着jdk5.0出来的,它位于java.util.concurrent.atomic包下,AtomicInteger,一个提供原子操作的Integer的类。也就是说在Java语言中,++i和i++操作并不是线程安全的,在使用的时候,不可避免的会用到synchronized关键字。而AtomicInteger则通过一种线程安全的加减操作接口,也就是说当有多个线程操作同一个变量时,使用AtomicInteger不会导致变量出现问题,而且比使用 synchronized效率高,
AtomicInteger的常用方法如下:
AtomicInteger(int initialValue):创建一个AtomicInteger实例,初始值由参数指定。不带参的构造方法初始值为0。
int addAndGet(int delta):以原子方式将输入的数值与实例中的值(AtomicInteger里的value)相加,并返回结果,与getAndAdd(int delta)相区分,从字面意思即可区分,前者返回相加后结果,后者先返回再相加。
boolean compareAndSet(int expect, int update) :如果当前值等于预期值,则以原子方式将该值设置为输入的值。
int getAndIncrement():以原子方式将当前值加1,注意:这里返回的是自增前的值。
void lazySet(int newValue):最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
int getAndSet(int newValue):以原子方式设置为newValue的值,并返回旧值。
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the previous value
*/
public final int getAndAdd(int delta) {
return U.getAndAddInt(this, VALUE, delta);
}
这里返回的上一个值。上一个值由于我们没设置初始值所以这里返回的就是0.
那么0&15还是0 。所以第一个元素的下标是0。
到这里整个1.1 prepare()步骤就都走完了。
总结:就是创建了Looper对象,MessageQueue对象,再把创建的Looper存到了当前线程的ThreadLocalMap中。下标为0,Key是Looper中的静态常量sThreadLocal,value是前面创建的Looper对象。
继续看//step1.2 myLooper()方法。
/**
* Return the Looper object associated with the current thread. Returns
* null if the calling thread is not associated with a Looper.
*/
public static @Nullable Looper myLooper() {
return sThreadLocal.get();//step 1.2.1
}
看注释就知道是获取前面我们存到ThreadLocalMap的looper。不过我还是要点进去看看。
/** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {@link #initialValue} method. * * @return the current thread's value of this thread-local */ public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t);//step 1.2.1.1 if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this);//step 1.2.1.2 if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
//step 1.2.1.1
/**
* Get the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @return the map
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
就是取出当前线程的ThreadLocalMap。就是之前在//step 1.1.2.4.1 存进去的。
//step 1.2.1.2
/** * Get the entry associated with key. This method * itself handles only the fast path: a direct hit of existing * key. It otherwise relays to getEntryAfterMiss. This is * designed to maximize performance for direct hits, in part * by making this method readily inlinable. * * @param key the thread local object * @return the entry associated with key, or null if no such */ private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e);//step 1.2.1.2.1 }
这里就是取出最终的Entry,就是前面我们存进去的第一个元素。我们计算一下这个I
首先(table.length - 1)=15 没什么疑问。
关键是看key.threadLocalHashCode。这个这里的Key就是sThreadLocal。
/**
* The difference between successively generated hash codes - turns
* implicit sequential thread-local IDs into near-optimally spread
* multiplicative hash values for power-of-two-sized tables.
*/
private static final int HASH_INCREMENT = 0x61c88647;
/**
* Returns the next hash code.
*/
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
这里和我们前面存的流程是一样的。之前我们返回的是0.但是这一次会返回。HASH_INCREMENT=0x61c88647。
实际就是0x61c88647&15=7 但是我们之前存的是0.所以走 //step 1.2.1.2.1
/** * Version of getEntry method for use when key is not found in * its direct hash slot. * * @param key the thread local object * @param i the table index for key's hash code * @param e the entry at table[i] * @return the entry associated with key, or null if no such */ private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) expungeStaleEntry(i); else i = nextIndex(i, len); e = tab[i]; } return null; }
这里会通过循环一个一个的去找entry直到找到。expungeStaleEntry是擦除过期的元素。估计因为和弱引用有关。这里有点不是很明白。问题不大,继续回到//step 1.2.1.2 往下走
就是取出value值返回。直接回到//step 1.2 也没什么东西,继续回到 //step 1。 Looper.prepareMainLooper();方法执行完了。接着会去创建目标Activity。这个我们不管。看//step 2
public static void loop() { //去掉一些不重要的代码后 final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue; for (;;) { Message msg = queue.next(); // might block //step 2.1 if (msg == null) { // No message indicates that the message queue is quitting. return; } try { msg.target.dispatchMessage(msg);//step 2.2 if (observer != null) { observer.messageDispatched(token, msg); } dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0; } catch (Exception exception) { if (observer != null) { observer.dispatchingThrewException(token, msg, exception); } throw exception; } finally { ThreadLocalWorkSource.restore(origWorkSource); if (traceTag != 0) { Trace.traceEnd(traceTag); } } } }
这个方法比较简单了。先进入死循环,//step 2.1就是从消息队列中MessageQueue取出消息。如果返回消息是null则return 就意味着looper所在的线程 代码都执行完了,这个线程结束。主线程肯定是不会出现这种情况的。
//step 2.2 中的dispatchMessage方法应该很熟悉了吧。msg.target 就是Handler。
总结一下Looper类:
1.创建了当前所在线程的Looper唯一对象,并通过ThreadLocalMap保存起来。
2.创建了MessageEqueue消息队列
3.无限循环从消息队列中取出消息,并回调dispatchMessage方法。这一步是一直运行在创建Looper对象的线程中的,这个是跨线程的关键。
接下来是Handler机制中最重要的类。MesssageQueue。
我们从之前跳过的//step 1.1.1.1 创建开始。
//step 1.1.1.1
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit(); //step 1
}
参数从意思看就是 是否允许退出,主线程是传入false是不允许主动退出的。
这里关键看 mPtr = nativeInit()
这个其实是个C++的指针地址
private long mPtr; // used by native code
//step 1
这里是JNI调用,我们通过包名+类名+方法名的规律可以知道 其实现在 android_os_MessageQueue.cpp类中。JNI类一般位于framework\base\core\jni 包中,但是也有一些是例外。那就只能去源码网站上去搜索了。
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();//step 1.1
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);//step 1.2
return reinterpret_cast<jlong>(nativeMessageQueue);//step 1.3
}
这里需要解释下几个类。
JNIEnv* env:代表的JNI环境,虚拟机会传参数进来
jclass clazz:代表Java的对象Object,也是虚拟机传入进来
//step 1.1
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);//step 1.1.1
Looper::setForThread(mLooper);
}
}
//step 1.1.1 创建native Looper
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);//step 1.1.1.1
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
strerror(errno));
AutoMutex _l(mLock);
rebuildEpollLocked();//step 1.1.1.2
}
//step 1.1.1.1 构造唤醒事件,这个已经是内核中的方法了。
加锁,再执行 //step 1.1.1.2 重建eqooll事件
void Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. 关闭旧的 epoll实例 if (mEpollFd >= 0) { close(mEpollFd); } // Allocate the new epoll instance and register the wake pipe. //创建新的epoll实例,注册唤醒管道 mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno)); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; //添加唤醒事件的监听 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); //添加请求队列事件的监听 int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s", request.fd, strerror(errno)); } } }
这个方法主要是 使用epoll多路IO复用的机制,这套机制是Linux提供的。简单的说就是 能监听文件描述符 并且注册要监听的事件,当有事件产生时就会触发 往后面执行,如果没有事件则会阻塞。具体可以自行了解Eqoll机制。这个机制是非常重要的。很多地方都有用到。
Looper对象中的mWakeEventFd添加到epoll监控,以及mRequests也添加到epoll的监控范围内。这样我们可以用2种方式唤醒。
//step 1.2
nativeMessageQueue->incStrong(env);//step 1.2
这句意思是增加引用计数。incStrong方法在RefBase类中。
是C++中的智能指针。
想使用智能指针处理一个对象,这个对象必须要继承RefBase类, 以便智能指针能操作类提供引用计数操作!
这里主要是为了回收机制。
Android C++层的内存收回主要是通过三个类来实现,分别是RefBase,sp,wp;
SP和WP是两个智能指针模板类,sp是strong pointer,wp则是weak pointer,亦我们常说的强引用和弱引用;实例化sp和wp这两个模板类的类型必须是派生自RefBase的类。
//step 1.3
return reinterpret_cast<jlong>(nativeMessageQueue)
将创建的nativeMessageQueue 指针地址通过reinterpret_cast强制转化符 转化为long类型返回到java层。到这里就知道了。java层中的mPtr就是一个指针地址。
接着看Lopper从MessageQueue中取消息的操作。
final MessageQueue queue = me.mQueue;
Message next() { // Return here if the message loop has already quit and been disposed. // This can happen if the application tries to restart a looper after quit // which is not supported. final long ptr = mPtr; if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } nativePollOnce(ptr, nextPollTimeoutMillis);//step 2.1 synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } }
这个方法比较关键,我们一步一步看。
首先判断初始化创建的NativeMessageQueue是否成功。
定义了2个int 变量。其中 nextPollTimeoutMillis 是指阻塞时间。
然后开启一个无限循环。这个循环退出只有2个返回,1是返回一个Messaged,2是返回null 前面分析Looper就知道如果获取的Message是null Looper循环就会退出 结束了。
//step 2.1
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);//step 2.1.1
}
这里传入了2个参数,1个是之前创建的NativeMessageQueue指针,1个是timeoutMillis 延迟退出时间。
//step 2.1.1
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);//step 2.1.1.1
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
这个就是赋值 然后就调用pollOnce(timeout) 轮询一次。
//step 2.1.1.1
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
result = pollInner(timeoutMillis);//step 2.1.1.1.1
}
}
调用内部轮询
//step 2.1.1.1.1
int Looper::pollInner(int timeoutMillis) {
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
这个方法很长,概括就是开始等待事件发生,timeoutMillis=0 则立即往下执行,timeoutMillis=-1则会一直阻塞 直到监听的事件到来,我们在前面添加了2个事件,1是唤醒,1是请求队列。
回到step 2.1进行往下走。就到了处理消息的时候了。
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
消息Message是单链表结构存储。异常查找 直到找到消息。
if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; }
如果没有消息则把超时设置为-1 。就是一直等待。
如果找到了Message则判断其when是否需要立刻值,还是要延时执行。
// Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; }
如果要退出,则返回NUll 。接着处理空闲Handler。的回调问题。如果都没有则需要阻塞。
接下来。看Handler类。这个是我们用的最多的。首先看构造函数。
有很多构造函数。最常用的是 无参和1个Looper参数的。
public Handler(@Nullable Callback callback, boolean async) {
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
+ " that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
获取当前线程创建保存的Looper。再取出MessageQueue。这2个对象一个线程中只有一个。
在看最常用的。Handler的sendMessage方法和post方法。
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
他们最终都会调用到这个方法。
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
关键是queue.enqueueMessage(msg, uptimeMillis);看它怎么把消息插入队列中。
boolean enqueueMessage(Message msg, long when) { if (msg.target == null) { throw new IllegalArgumentException("Message must have a target."); } if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use."); } synchronized (this) { if (mQuitting) { IllegalStateException e = new IllegalStateException( msg.target + " sending message to a Handler on a dead thread"); Log.w(TAG, e.getMessage(), e); msg.recycle(); return false; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { nativeWake(mPtr); } } return true; }
如果头消息为null,当前消息的时间延长为0,当前消息的延迟时间 小于头消息
则把当前消息设置为头消息。
否则把消息 插入到 链表的合适位置通过when。
when=systemClock.uptimeMillis() + delayMillis
这个延时是通过设置的时间加系统开机后的时间。
最后再看看分发方法。
/** * Subclasses must implement this to receive messages. */ public void handleMessage(@NonNull Message msg) { } /** * Handle system messages here. */ public void dispatchMessage(@NonNull Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); } } private static void handleCallback(Message message) { message.callback.run(); }
如果消息的callback不为空则直接回调run方法。这里的callback其实就是runnable接口,它和线程每关系,只是作为一个接口使用。
否则就执行handleMessage方法。子类Handler必须重写这个方法来接收Message。
到此整个Handler机制就分析完了,设计到的知识点还是很多的。
首先画一个UML图:
再画一个简单的流程图。
蓝色和红色代表2个不同的线程。
Handler是消息处理机制也是线程间的通信机制,它是无法跨进程使用,利用的是进程中各线程能共享内存。是一种生产消费者模型。主要由 Looper,MessageQueue,Handler,Message 和ThreadLocal 组合而成。
调用Looper.prepare()时,会创建一个Looper对象 保存到当前线程的ThreadLocalMap中,每个线程只会创建一个Looper。Looper持有一个MessageQueue的引用。
调用Looper.loop()时,会开启无线循环调用消息队列的Next()方法去获取消息,当取到消息时就会回调Handler的dispatchMessage()分发消息方法,然后继续循环。
MessageQueue持有消息列表的头结点消息,初始化时会调用nativeInit()方法。会调用到C++中的android_os_MessageQueue.cpp和Looper.cpp中的方法。主要是利用Linux听的epoll机制。使用epoll_create()创建一个文件描述符。
再用epoll_ctl()往描述符中添加要监听的事件。再用
epoll_wait()开始监听。最后一个参数是timeout,
指定epoll的超时时间,单位是毫秒。当timeout为-1是,epoll_wait调用将永远阻塞,直到某个时间发生。当timeout为0时,epoll_wait调用将立即返回。
消息队列的next()方法中,开发无限循环然后调用nativePollOnce()轮询一次,第一次的参数为0会立即往下执行。由于目前没消息,所以会把timeout参数设置为-1。进入阻塞状态 直到被唤醒。当有消息时则会判断当前消息的when参数是否需要立即执行,是则返回 否则设置timeout值为等待时间。
当我们new Handler对象时,会获取当前线程中的Looper对象,所以主线的Hander一定要在主线程中初始化。
当调用sendMessage或者post时最终会调用到 消息队列中的enqueueMessage(),把消息插入到消息单链表的合适位置,并根据需要唤醒之前阻塞的next()进行执行。
next()方法运行在之前创建looper的线程中。而enqueueMessage()方法是运行在被调用的各个线程中。这里就实现了跨线程。
我觉得整个机制中最关键的是。linux的epoll多路IO复用机制。它保证了无限循环时不会出现CPU占有率很高,有消息时唤醒处理,没有消息时则阻塞。在android的很多其他地方都有用到。比如Zygota进程,binder机制。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。