赞
踩
介绍:
# GIL会释放,最后的结果不定。释放的位置不定 total = 0 def add(): global total for i in range(1000000): total += 1 def desc(): global total for i in range(1000000): total -= 1 import threading thread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() thread1.join() thread2.join() print(total)
数值不确定
threading
实现多线程# 对于io操作,多线程和多进程性能差别不大 import time import threading def get_detail_html(url): print('我获取详情内容了') time.sleep(2) print('我获取内容完了') def get_detail_url(url): print('我获取url了') time.sleep(2) print('我获取url完了') if __name__=='__main__': thread1=threading.Thread(target=get_detail_html,args=('',)) thread2=threading.Thread(target=get_detail_url,args=('',)) start_time=time.time() thread1.start() thread2.start() #时间非常小,是运行代码的时间差,而不是2秒 #这样运行一共有三个线程,主线程和其他两个子线程(thread1,thread2),而且是并行的,子线程启动后,主线程仍然往下运行,因此时间不是2秒 #守护线程(主线程退出,子线程就会kill掉) print('last time:{}'.format(time.time()-start_time))
我获取详情内容了
我获取url了
last time:0.0009658336639404297
我获取内容完了
我获取url完了
[Finished in 2.1s] # 运行时间,输出内容先后存在不确定性
import time import threading def get_detail_html(url): print('我获取详情内容了') time.sleep(4) print('我获取内容完了') def get_detail_url(url): print('我获取url了') time.sleep(2) print('我获取url完了') if __name__=='__main__': thread1=threading.Thread(target=get_detail_html,args=('',)) thread2=threading.Thread(target=get_detail_url,args=('',)) #将线程1设置成守护线程(主线程退出,该线程就会被kill掉),但会等线程2运行完(非守护线程) thread1.setDaemon(True) start_time=time.time() thread1.start() thread2.start() print('last time:{}'.format(time.time()-start_time))
我获取详情内容了
我获取url了
last time:0.0009963512420654297
我获取url完了
[Finished in 2.1s]
等某个子线程执行完在继续执行主线程代码:
import time import threading def get_detail_html(url): print('我获取详情内容了') time.sleep(4) print('我获取内容完了') def get_detail_url(url): print('我获取url了') time.sleep(2) print('我获取url完了') if __name__=='__main__': thread1=threading.Thread(target=get_detail_html,args=('',)) thread2=threading.Thread(target=get_detail_url,args=('',)) start_time=time.time() thread1.start() thread2.start() #等待两个线程执行完 thread1.join() thread2.join() print('last time:{}'.format(time.time()-start_time))
我获取详情内容了
我获取url了
我获取url完了
我获取内容完了
last time:4.0015480518341064
更多方法:
import time import threading class GetDetailHtml(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): print('我获取详情内容了') time.sleep(4) print('我获取内容完了') class GetDetailUrl(threading.Thread): def __init__(self,name): super().__init__(name=name) def run(self): print('我获取url了') time.sleep(2) print('我获取url完了') if __name__=='__main__': thread1=GetDetailHtml('get_detail_html') thread2=GetDetailUrl('get_detail_url') start_time=time.time() thread1.start() thread2.start() #等待两个线程执行完 thread1.join() thread2.join() print('last time:{}'.format(time.time()-start_time))
我获取详情内容了
我获取url了
我获取url完了
我获取内容完了
last time:4.015072822570801
注:共享变量的方式是线程不安全的操作(不推荐)
import time import threading url_lists = [] def get_detail_html(): #可以单独放在某一个文件管理(注意引入时要引用文件) global url_lists url_lists=url_lists while True: if len(url_lists): url=url_lists.pop() print('我获取详情内容了') time.sleep(4) print('我获取内容完了') def get_detail_url(url_lists): while True: print('我获取url了') time.sleep(2) for i in range(20): url_lists.append('url'+str(i)) print('我获取url完了') if __name__ == '__main__': thread_url=threading.Thread(target=get_detail_url,args=(url_lists,)) thread_url.start() #开启十个线程爬取详情 for i in range(10): thread_html=threading.Thread(target=get_detail_html,) thread_html.start()
我获取url了
我获取url完了
我获取详情内容了
我获取详情内容了
我获取详情内容了
我获取url了
我获取url完了
我获取url了
.......
注:是线程安全的(Queue本身就是线程安全的【使用了线程锁的机制】,使用了双端队列,deque)
Queue中的方法:
import time import threading from queue import Queue def get_detail_html(queue): while True: url=queue.get() print('我获取详情内容了') time.sleep(4) print('我获取内容完了') def get_detail_url(queue): while True: print('我获取url了') time.sleep(2) for i in range(20): queue.put('url'+str(i)) print('我获取url完了') #执行该方法才能执行退出,和join成对出现 urls_queue.join() urls_queue.task_done() if __name__ == '__main__': #设置队列最大值1000,过大对内存会有很大影响 urls_queue=Queue(maxsize=1000) thread_url=threading.Thread(target=get_detail_url,args=(urls_queue,)) thread_url.start() #开启十个线程爬取详情 for i in range(10): thread_html=threading.Thread(target=get_detail_html,args=(urls_queue,)) thread_html.start()
我获取url了
我获取url完了
我获取详情内容了
我获取详情内容了
我获取详情内容了
我获取详情内容了
我获取详情内容了
我获取详情内容了
......
锁住的代码段都只能有一个代码段运行
获取(acquire)和释放(release)锁都需要时间:因此用锁会影响性能;还有可能引起死锁(互相等待,A和B都需要a,b两个资源,A获取了a,B获取了B,A等待b,B等待a或则未释放锁再次获取)
"""
A(a, b)
acquire(a)
acquire(b)
B(a, b)
acquire(b)
acquire(a)
A想运行必须先拿到a在拿到b才能运行。
B想运行必须先拿到b在拿到a。
如果A在拿到a资源的同时B拿到了b的资源那么就形成了死锁。如果B acquire(a) acquire(b) 就不会形成死锁
"""
import threading from threading import Lock total=1 lock=Lock() def add(): global total for i in range(1000000): #获取锁 lock.acquire() total+=1 #释放锁,释放后其他才能获取 lock.release() def decs(): global total for i in range(1000000): lock.acquire() total-=1 lock.release() thread1=threading.Thread(target=add) thread2=threading.Thread(target=decs) thread1.start() thread2.start() thread1.join() thread2.join() print(total)
1
在一个线程中可以,可以连续多次acquire(获取资源),一定要注意acquire的次数要和release的次数一致
# 获得两次资源
lock.acquire()
lock.acquire()
total += 1
lock.release() # 释放锁
lock.release() # 释放锁
使用锁进行先后对话:发现先启动的线程把话先说完(第一个线程启动后运行完,第二个线程还没有启动,或者还未切换到另一个线程)
from threading import Condition # 条件变量,用于复杂的线程间同步 import threading class XiaoAi(threading.Thread): def __init__(self, lock): super().__init__(name="小爱") self.lock = lock def run(self): self.lock.acquire() print("{}:在".format(self.name)) self.lock.release() self.lock.acquire() print("{}:好啊".format(self.name)) self.lock.release() class TianMao(threading.Thread): def __init__(self, lock): super(TianMao, self).__init__(name="天猫") self.lock = lock def run(self): self.lock.acquire() print("{}:小爱同学".format(self.name)) self.lock.release() self.lock.acquire() print("{}:我们来对古诗吧".format(self.name)) self.lock.release() # 通过condition完成协同读诗 class XiaoAi1(threading.Thread): def __init__(self, cond): super().__init__(name="小爱") self.cond = cond def run(self): with self.cond: self.cond.wait() print("{}:在".format(self.name)) self.cond.notify() self.cond.wait() print("{}:好啊".format(self.name)) # self.cond.notify() # self.cond.wait() class TianMaoCond(threading.Thread): def __init__(self, cond): super().__init__(name="天猫") self.cond = cond def run(self): with self.cond: # 等价于self.cond.acquire() print("{}:小爱同学".format(self.name)) self.cond.notify() self.cond.wait() print("{}:我们来对古诗吧".format(self.name)) self.cond.notify() # self.cond.wait() # 使用self.cond.acquire 必须调用release方法 if __name__ == '__main__': # Lock方法无法实现 # lock = threading.Lock() # xiao_ai = XiaoAi(lock) # tian_mao = TianMao(lock) # # tian_mao.start() # xiao_ai.start() # 使用Condition实现 # 在调用with cond之后才能调用wait方法或者notify方法 # condition有两层锁,一把底层锁会在线程调用了wait方法的时候释放 # 上面的锁会在每次调用wait的时候分别分配一把并放入到cond的等待队列中。等待notify方法唤醒的时候 condition = threading.Condition() xiao_ai1 = XiaoAi1(condition) tian_mao1 = TianMaoCond(condition) xiao_ai1.start() tian_mao1.start()
通过调用with方法(实际是__enter__魔法函数),Condition有两层锁,一把底层锁,会在线程调用了wait()方法时释放,上面的锁会在每次调用wait()时分配一把锁并放入到condition的等待队列中,等待notify()方法的唤醒。
Semaphores内部实质是用Condition完成的,Queue实质也是;
用来控制进入数量的锁(如文件写一般只能一个线程,读可以允许同时多个线程读。
# semaphore 是用于控制进入数量的锁 # 例如: 文件读写。 写一般只用于一个线程写。 读可以允许多个线程 import threading import time class HtmlSpider(threading.Thread): def __init__(self, sem): super(HtmlSpider, self).__init__() self.sem = sem def run(self): time.sleep(2) print("success") self.sem.release() # 可用线程加一 class UrlProducer(threading.Thread): def __init__(self, sem): super().__init__() self.sem = sem def run(self): for i in range(20): self.sem.acquire() # 可用线程减一 html = HtmlSpider(self.sem) html.start() # 不能在这里释放线程。 if __name__ == '__main__': sem = threading.Semaphore(3) # 控制线程数 url_producer = UrlProducer(sem) url_producer.start()
与semaphore比较: 比semaphore更加容易实现线程数量的控制
import time # as_completed其实是一个生成器 from concurrent.futures import ThreadPoolExecutor, as_completed, wait # 主线程中可以获取某一个线程的状态或者某一个任务的状态。以及返回值 # 当一个线程完成的时候主线程能立即知道 # futures可以让多线程和多进程编码接口一致 def get_html(times): time.sleep(times) print("html success-{}s".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 通过submit函数提交执行的函数到线程池中 submit立即返回 task1 = executor.submit(get_html, (3)) task2 = executor.submit(get_html, (2)) # 获取已经成功的task的值 urls = [3, 2, 4, 6] # 批量提交 all_task = [executor.submit(get_html, (url)) for url in urls] # wait 等待执行完成在往下执行 wait(all_task) print("main") for future in as_completed(all_task): data = future.result() print(data) # 通过executor的map获取已经完成的task的值 # for data in executor.map(get_html, urls): # print(data) # done方法判断某个任务是否完成 # print(task1.done()) # result 返回执行结果 # print(task1.result()) """ as_completed源码分析: if timeout is not None: end_time = timeout + time.monotonic() fs = set(fs) total_futures = len(fs) with _AcquireFutures(fs): finished = set( f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) pending = fs - finished waiter = _create_and_install_waiters(fs, _AS_COMPLETED) finished = list(finished) 以上源码都用来获取已经完成的线程 """
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed import time def fib(n): if n<=2: return 1 return fib(n-2)+fib(n-1) if __name__=='__main__': #代码要放在这里面,不然可能抛异常 with ThreadPoolExecutor(3) as excutor: start_time=time.time() all_task=[excutor.submit(fib,num) for num in range(25,40)] for future in as_completed(all_task): data=future.result() print('结果:'+str(data)) print('多线程所需时间:'+str(time.time()-start_time)) ''' 多线程所需时间:72.10901117324829 ''' with ProcessPoolExecutor(3) as excutor: start_time=time.time() all_task=[excutor.submit(fib,num) for num in range(25,40)] for future in as_completed(all_task): data=future.result() print('结果:'+str(data)) print('多进程所需时间:'+str(time.time()-start_time)) ''' 多进程所需时间:43.14996862411499 '''
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed import time def random_sleep(n): time.sleep(n) return n if __name__=='__main__': #代码要放在这里面,不然可能抛异常 with ThreadPoolExecutor(3) as excutor: start_time=time.time() all_task=[excutor.submit(random_sleep,num) for num in [2]*30] for future in as_completed(all_task): data=future.result() print('休息:'+str(data)+'秒') print('多线程所需时间:'+str(time.time()-start_time)) ''' 多线程所需时间:20.010841131210327 ''' with ProcessPoolExecutor(3) as excutor: start_time=time.time() all_task=[excutor.submit(random_sleep,num) for num in [2]*30] for future in as_completed(all_task): data=future.result() print('休息:'+str(data)+'秒') print('多进程所需时间:'+str(time.time()-start_time)) ''' 20.755817651748657 '''
......
pool=multiprocessing.Pool(3)
#异步提交任务
# result=pool.apply_async(get_html,args=(2,))
# #关闭不在进入进程池
# pool.close()
# pool.join()
# print(result.get())
#和执行顺序一样
for result in pool.imap(get_html,[1,5,3]):
print('{} sleep success'.format(result))
#和先后完成顺序一样
for result in pool.imap_unordered(get_html, [1, 5, 3]):
print('{} sleep success'.format(result))
from multiprocessing import Queue,Process import time def producer(queue): queue.put('a') time.sleep(2) def consumer(queue): time.sleep(2) data=queue.get() print(data) if __name__=='__main__': queue=Queue(10) pro_producer=Process(target=producer,args=(queue,)) pro_consumer=Process(target=consumer,args=(queue,)) pro_producer.start() pro_consumer.start() pro_producer.join() pro_consumer.join()
from queue import Queue——>用于多线程
from multiprocessing import Queue——>用于非进程池的多进程通信
from multiprocessing import Manager——>manager.Queue()用于进程池通信
from multiprocessing import Pipe import time def producer(pipe): pipe.send('a') time.sleep(2) def consumer(pipe): time.sleep(2) data=pipe.recv() print(data) if __name__=='__main__': #通过Pipe进行进程间通信(管道),pipe只能适用于两个进程 recv_pipe,send_pipe=Pipe() queue=Manager().Queue(10) pro_producer=Process(target=producer,args=(send_pipe,)) pro_consumer=Process(target=consumer,args=(recv_pipe,)) pro_producer.start() pro_consumer.start() pro_producer.join() pro_consumer.join()
from multiprocessing import Process, Queue, Pipe, Pool, Manager import time # 共享全局变量不能适用于多进程编程,可以适用于多线程。 # multiprocess中的queue不能用于pool进程池。可以使用Manager def product(queue): queue.put("a") time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) # 进程共享变量 def process_dict(dic, key, value): dic[key] = value if __name__ == '__main__': # 进程共享变量 process_dict1 = Manager().dict() first_process = Process(target=process_dict, args=(process_dict1, "xiaohao1", 22)) second_process = Process(target=process_dict, args=(process_dict1, "xiaohao2", 23)) first_process.start() second_process.start() first_process.join() second_process.join() print(process_dict1)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。