当前位置:   article > 正文

《python源码剖析》笔记 python多线程机制_剖析 pyeval_acquirethread

剖析 pyeval_acquirethread

本文为senlie原创,转载请保留此地址:http://blog.csdn.net/zhengsenlie


1.GIL与线程调度
Python中的线程是操作系统的原生线程,Python虚拟机使用一个全局解释器锁(Global Interpreter Lock)来互斥线程对Python虚拟机的使用
为了支持多线程机制,一个基本的要求就是需要实现不同线程对共享资源访问的互斥,所以引入了GIL。
GIL:在一个线程拥有了解释器的访问权之后,其他的所有线程都必须等待它释放解释器的访问权,即使这些线程的下一条指令并不会互相影响。
在调用任何Python C API之前,要先获得GIL
GIL缺点:多处理器退化为单处理器;优点:避免大量的加锁解锁操作



线程调度的两个问题:
1.何时挂起当前线程
Python在执行了N条指令之后,通过软件模拟了时钟中断,开始了线程调度机制
2.选择哪个等待的线程
Python借用了底层操作系统所提供的线程调度机制决定下一个进入解释器的线程。


2.Python Thread
Python中所提供的最基础的多线程机制的接口是thread module,用C实现
在thread module的基础上,Python提供了一个更高层的多线程机制接口,threading module


thread module中的多线程接口

  1. static PyMethodDef thread_methods[] = {
  2. {"start_new_thread", (PyCFunction)thread_PyThread_start_new_thread,
  3. METH_VARARGS,
  4. start_new_doc},
  5. {"start_new", (PyCFunction)thread_PyThread_start_new_thread,
  6. METH_VARARGS,
  7. start_new_doc},
  8. {"allocate_lock", (PyCFunction)thread_PyThread_allocate_lock,
  9. METH_NOARGS, allocate_doc},
  10. {"allocate", (PyCFunction)thread_PyThread_allocate_lock,
  11. METH_NOARGS, allocate_doc},
  12. {"exit_thread", (PyCFunction)thread_PyThread_exit_thread,
  13. METH_NOARGS, exit_doc},
  14. {"exit", (PyCFunction)thread_PyThread_exit_thread,
  15. METH_NOARGS, exit_doc},
  16. {"interrupt_main", (PyCFunction)thread_PyThread_interrupt_main,
  17. METH_NOARGS, interrupt_doc},
  18. {"get_ident", (PyCFunction)thread_get_ident,
  19. METH_NOARGS, get_ident_doc},
  20. {"_count", (PyCFunction)thread__count,
  21. METH_NOARGS, _count_doc},
  22. {"stack_size", (PyCFunction)thread_stack_size,
  23. METH_VARARGS,
  24. stack_size_doc},
  25. {NULL, NULL} /* sentinel */
  26. };

在Python虚拟机启动时,多线程机制并没有激活,它只支持单线程,一旦用户调用thread.start_new_thread,明确指示Python虚拟机创建新的线程,
Python就能意识到用户需要多线程的支持,这个时候,Python虚拟机会自动创立多线程机制需要的数据结构、环境以及那个至关重要的GIL


建立多线程环境 PyEval_InitThreads()
  1. typedef void *PyThread_type_lock;
  2. static PyThread_type_lock interpreter_lock = 0; //这就是GIL
  3. static long main_thread = 0;
  4. void
  5. PyEval_InitThreads(void)
  6. {
  7. if (interpreter_lock)
  8. return;
  9. interpreter_lock = PyThread_allocate_lock();
  10. PyThread_acquire_lock(interpreter_lock, 1);
  11. main_thread = PyThread_get_thread_ident();
  12. }


Win32中,GIL 指向了PNRMUTEX类型的一个对象
  1. typedef struct NRMUTEX{
  2. LONG owned; //指示GIL是否可用
  3. DWORD thread_id;//获得GIL的线程id
  4. HANDLE hevent;//Win32平台下Event这个内核对象
  5. }NRMUTEX, *PNRMUTEX;

PyThread_acquire_lock
两种工作方式
1.如果GIL不可用,不等待
GIL被初始化为-1,意味着GIL可用,当有线程要使用GIL时,将其置为0,表示GIL已经被一个线程占用了。
2.如果GIL不可用,通过WaitForSingleObject将自身挂起,直到别的线程释放GIL,然后由操作系统将自己唤醒。
当有一个线程开始等待GIL时,其owned会被增加1,当有线程释放GIL时,owned减1



创建线程 PyThread_start_new_thread
在bootstrap中,子线程完成了三个动作:
1.获得线程id
2.通知obj->done内核对象
3.调用t_bootstrap

在t_bootstrap中,子线程开始与主线程竞争GIL,进行PyEval_AcquireThread申请GIL,
获得GIL后通过PyEval_CallObjectWithKeywords调用 PyEval_EvalFrameEx,即Python的字节码引擎。
在PyEval_CallObjectWithKeyWords之后,子线程释放GIL,并完成销毁线程的收尾工作。

线程状态保护机制
在Python内部,维护着一个全局变量:PyThreadState *_PyThread_State_Current。当前活动线程所对应
的线程状态对象就保存在这个变量里,当Python调度线程时,会将被激活的线程所对应的线程状态对象
赋给 _PyThread_State_Current,使其始终保存着活动线程的状态对象。
Python内部会通过一个单向链表来管理所有的Python线程的状态对象。


  1. static struct key *
  2. find_key(int key, void *value)
  3. {
  4. struct key *p, *prev_p;
  5. //[1]:获取当前线程的线程id,并锁住线程状态对象链表
  6. long id = PyThread_get_thread_ident();
  7. PyThread_acquire_lock(keymutex, 1);
  8. //[2]:遍历线程状态对象链表,寻找key和id都匹配的元素
  9. for (p = keyhead; p != NULL; p = p->next) {
  10. if (p->id == id && p->key == key)
  11. goto Done;
  12. }
  13. //[3]:如果[2]处搜索失败,则创建新的元素,并加入线程状态对象链表
  14. p = (struct key *)malloc(sizeof(struct key));
  15. if (p != NULL) {
  16. p->id = id;
  17. p->key = key;
  18. p->value = value;
  19. p->next = keyhead;
  20. keyhead = p;
  21. }
  22. Done:
  23. //[4]:释放锁住的线程状态对象链表
  24. PyThread_release_lock(keymutex);
  25. return p;
  26. }

在上面的代码的[1]和[4]处,Python通过在 _PyGILState_Init中创建的keymutex来互斥对状态
对象列表的的访问。Python为状态对象列表所提供的接口就是链表的插入、删除和查询操作。
  1. //查询操作
  2. void *PyThread_get_key_value(int key)
  3. //插入操作
  4. int PyThread_set_key_value(int key, void *value)
  5. //删除操作
  6. void PyThread_delete_key(int key)

操作系统级的线程调度和Python级的线程调度是不同的。Python级的线程调度一定意味着GIL
拥有权的易手,而操作系统级的线程设计并不一定意味着GIL的易手。当所有的线程都完成了初始化
动作之后,操作线程调度和Python的线程调度才会统一。


3.Python线程的调度
Python的线程调度机制是内建在Python的解释器核心PyEval_EvalFrameEx中的
  1. /*Interpreter main loop*/
  2. PyObject *PyEval_EvalFrameEx(PyFrameObject *f){
  3. for(;;){
  4. if (--_Py_Ticker < 0) {
  5. //在切换线程之前,重置_Py_Ticker为100,为下一个线程做准备
  6. _Py_Ticker = _Py_CheckInterval;
  7. tstate->tick_counter++;
  8. if (interpreter_lock) {
  9. //[1]:撤销当前线程状态对象,释放GIL,给别的线程一个机会
  10. PyThreadState_Swap(NULL)
  11. PyThread_release_lock(interpreter_lock);
  12. //[2]:别的线程现在已经开始执行了,咱们重新再申请GIL,等待下一次被调度
  13. PyThread_acquire_lock(interpreter_lock, 1);
  14. PyThreadState_Swap(tstate)
  15. }
  16. }
  17. fast_next_opcode:
  18. //...
  19. }
  20. }
PyEval_EvalFrameEx每执行一条字节码指令,_Py_Ticker就将减少1;当执行了_Py_CheckInterval条指令之后,
_Py_Ticker将减少到0,这就将进入线程调度了。


4.Python线程的用户级互斥与同步
内核级通过GIL实现的互斥保护了内核的共享资源,用户级互斥保护了用户程序中的共享资源
  1. import thread
  2. import time
  3. input = None
  4. lock = thread.allocate_lock() #创建一个Lock对象
  5. def threadProc():
  6. while True:
  7. print 'sub thread id: ', thread.get_ident()
  8. print 'sub thread %d wait lock...' % thread.get_ident()
  9. lock.acquire()
  10. print 'sub thread %d get lock...' % thread.get_ident()
  11. print 'sub thread %d receive input : %s' % (thread.get_ident(), input)
  12. print 'sub thread %d release lock...' % thread.get_ident()
  13. lock.release()
  14. time.sleep(1)
  15. thread.start_new_thread(threadProc, ())
  16. print 'main thread id : ', thread.get_ident()
  17. while True:
  18. print 'main thread %d wait lock...' % thread.get_ident()
  19. lock.acquire() #线程在用户级需要访问共享资源之前需要先申请用户级的lock
  20. print 'main thread %d get lock...' % thread.get_ident()
  21. input = raw_input()
  22. print 'main thread %d release lock...' % thread.get_ident()
  23. lock.release() #释放lock
  24. time.sleep(1)

在Win32平台下的Python实现中,用户级线程的互斥与同步机制是通过Event来完成的。



5.高级线程库——threading

  1. import threading
  2. import time
  3. class MyThread(threading.Thread):
  4. def run(self):
  5. while True:
  6. print 'sub thread : ', threading._get_ident()
  7. time.sleep(1)
  8. mythread = MyThread()
  9. mythread.start()
  10. while True:
  11. print 'main thread : ', threading._get_ident()
  12. time.sleep(1)


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/199248
推荐阅读
相关标签
  

闽ICP备14008679号