赞
踩
import threading
import time
def job():
print("这是一个需要执行的任务")
# 激活的线程个数
print("当前线程的个数:",threading.active_count())
# 打印当前线程的详细信息
print("当前线程信息:",threading.current_thread())
time.sleep(5)
if __name__ == '__main__':
job()
import time
import _thread
def job(name):
print("这是一个需要执行的任务")
print(name,time.ctime())
time.sleep(5)
if __name__ == '__main__':
# 创建多个线程, 但是没有开始执行任务;
_thread.start_new_thread(job,('thread1',))
_thread.start_new_thread(job, ('thread2',))
while True:
pass
import threading import time def job(name): print("这是一个需要执行的任务") print("当前线程的个数:",threading.active_count()) print("当前线程信息:",threading.current_thread()) time.sleep(5) print(name,time.ctime()) if __name__ == '__main__': job('job0') # 创建多个线程 t1 = threading.Thread(target=job,name='job1',args=("job1-name",)) t2 = threading.Thread(target=job, name='job2', args=("job2-name",)) t1.start() t2.start() # 等待所有的子线程执行结束之后, 继续执行主线程的内容; t1.join() t2.join() print('hello')
import threading import time # 类的继承 class Job(threading.Thread): # 重写构造方法;如果执行的任务需要传递参数, 那将参数通过构造函数与self绑定; def __init__(self,jobname): super(Job,self).__init__() self.jobname=jobname # 将多线程需要执行的任务重写到run方法中; def run(self): print("这是一个需要执行的任务") print("当前线程的个数:", threading.active_count()) print("当前线程信息:", threading.current_thread()) time.sleep(5) print(self.jobname,time.ctime()) if __name__=='__main__': t1 = Job(jobname='job1') t2 = Job(jobname='job2') t1.start() t2.start() t1.join() t2.join() print('Suceess')
import threading import time from mytimeit import timeit def music(name): for i in range(2): print("正在听音乐%s"%(name)) time.sleep(1) def code(name): for i in range(2): print("正在编写代码%s"%(name)) time.sleep(2) @timeit def use_thread(): t1 = threading.Thread(target=music,args=("去年夏天",)) t2 = threading.Thread(target=code, args=("爬虫",)) t1.start() t2.start() t1.join() t2.join() @timeit def not_use_thread(): music("去年夏天") code("爬虫") if __name__=='__main__': use_thread() not_use_thread()
join方法: 会等待, 直到t1线程执行结束;阻塞正在调用的线程
def job(name):
time.sleep(1)
print(name)
t1 = threading.Thread(target=job,args=('job1',))
t1.start()
t1.join() # 会等待, 直到t1线程执行结束;阻塞正在调用的线程
t2 = threading.Thread(target=job,args=('job2',))
t2.start()
t2.join() # 会等待, 直到t2线程执行结束;阻塞正在调用的线程
print("main thread end")
将t1线程生命为守护线程, 如果设置为True, 子线程启动, 当主线程执行结束, 子线程也结束
t1.setDaemon(True)
# 当主线程执行结束, 让没有执行的线程强制结束;set_daemon import threading import time def music(name): for i in range(2): print("正在听音乐%s"%(name)) time.sleep(1) def code(name): for i in range(2): print("正在编写代码%s"%(name)) time.sleep(2) if __name__=='__main__': start_time=time.time() t1 = threading.Thread(target=music,args=('去年夏天',)) t2 = threading.Thread(target=code,args=('爬虫',)) # 将t1线程生命为守护线程, 如果设置为True, 子线程启动, 当主线程执行结束, 子线程也结束 # 设置setDaemon必须在启动线程之前进行设置; t1.setDaemon(True) t2.setDaemon(True) t1.start() t2.start() print('花费时间:%s'%(time.time()-start_time))
为什么会需要线程锁?
多个线程对同一个数据进行修改时, 肯能出现不可预料的情况.
import threading def add(lock): # 操作变量之前进行加锁 lock.acquire() global money for i in range(1000000): money += 1 # 操作变量完成后进行解锁 lock.release() def reduce(lock): # 操作变量之前进行加锁 lock.acquire() global money for i in range(1000000): money -= 1 # 操作变量完成后进行解锁 lock.release() if __name__=='__main__': money=0 # 实例化一个锁对象 lock = threading.Lock() t1 = threading.Thread(target=add,args=(lock,)) t2 = threading.Thread(target=reduce, args=(lock,)) t1.start() t2.start() t1.join() t2.join() print("最终金额为:%s" %(money))
若不枷锁:
枷锁:
python使用多线程 , 是个好主意么? 为什么?
- GIL(全局解释器锁)
- python解释器默认每次只允许一个线程执行
执行过程:
1). 设置GIL
2). 切换到线程去运行对应的任务;
3). 运行
- 执行完了
- time.sleep()
- 获取其他信息才能继续执行, eg: 从网络上获取网页信息等;
3. 把线程设置为睡眠状态
4. 解锁GIL
5.再次重复执行上述内容;
python解释器:Cpython解释器, Jpython解释器, p-python解释器
方法的选择:
Python并不支持真正意义上的多线程。Python中提供了多线程包,但是如果你想通过多线程提高代码的速度,
使用多线程包并不是个好主意。Python中有一个被称为Global Interpreter Lock(GIL)的东西,
它会确保任何时候你的多个线程中,只有一个被执行。线程的执行速度非常之快,会让你误以为线程是并行执行的,
但是实际上都是轮流执行。经过GIL这一道关卡处理,会增加执行的开销。这意味着,如果你想提高代码的运行速度,
使用threading包并不是一个很好的方法。
# I/O密集型操作: 多线程操作
# CPU/计算密集型:多进程操作
import threading from mytimeit import timeit def job(l): sum(l) @timeit def use_thread(): li = range(1,10000) for i in range(5): t = threading.Thread(target=job,args=(li,)) t.start() @timeit def use_no_thread(): li = range(1,10000) job(li) if __name__=='__main__': use_thread() use_no_thread()
1). 理论上多线程执行任务, 会产生一些数据, 为其他程序执行作铺垫;
2). 多线程是不能返回任务执行结果的, 因此需要一个容器来存储多线程产生的数据
3). 这个容器如何选择? list(栈, 队列)
import threading from queue import Queue def job(l,queue): # 将任务的结果存储到队列中; queue.put(sum(l)) def use_thread(): # 实例化一个队列, 用来存储每个线程执行的结果; q =Queue() li= [[1,2,3,4,5], [2,3,4,5,6], [2,3,4,5,6,7,8], [2,3,4,5,6]] threads = [] for i in li: t = threading.Thread(target=job,args=(i,q)) threads.append(t) t.start() # join方法等待所有子线程之心结束 [thread.join() for thread in threads] # 从队列里面拿出所有的运行结果 result = [q.get() for _ in li] print(result) if __name__=="__main__": use_thread()
给定200个ip地址, 可能开放端口为80, 443, 7001, 7002, 8000, 8080, 9000(flask), 9001
以http://ip:port形式访问页面以判断是否正常访问.
1). 任务1:构建所有的url地址;===存储到一个数据结构中
2). 任务2:依次判断url址是否可以成功访问
用列表储存:
import threading from urllib.request import urlopen def creat_data(): with open('ips.txt','w') as f: for i in range(200): f.write('172.25.254.%d\n'%(i+1)) def creat_url(): portlist = [80,443,7001,7002,8000,8080] with open('ips.txt') as f: ips = [ip.strip()for ip in f] urls = ['http://%s:%s'%(ip,port) for ip in ips for port in portlist] return urls def job(url): try: urlobj=urlopen(url) except Exception as e: print("%s unknown url"%(url)) else: print("%s is ok"%(url)) if __name__=='__main__': creat_data() urls = creat_url() threads = [] for url in urls: t = threading.Thread(target=job,args=(url,)) threads.append(t) t.start() [thread.join() for thread in threads]
用队列:
import threading from queue import Queue from urllib.request import urlopen class Producer(threading.Thread): def __init__(self, queue): super(Producer, self).__init__() self.queue = queue def run(self): portlist = [80, 443, 7001, 7002, 8000, 8080] with open('ips.txt') as f: ips = [ip.strip() for ip in f] # 每生产一个url地址, 就将生产的数据放到队列里面; for ip in ips: for port in portlist: url = 'http://%s:%s' % (ip, port) self.queue.put(url) # urls = ['http://%s:%s' % (ip, port) for ip in ips for port in portlist] # return urls class Consumer(threading.Thread): def __init__(self, queue): super(Consumer, self).__init__() self.queue = queue def run(self): try: url = self.queue.get() urlObj = urlopen(url) except Exception as e: print("%s unknown url" % (url)) else: print("%s is ok" % (url)) if __name__ == '__main__': queue = Queue() # 一个线程对象, 生产者 p1 = Producer(queue) p1.start() # 消费者启动多个线程(启动20个) for i in range(20): c1 = Consumer(queue) c1.start()
import json import threading from urllib.request import urlopen from mytimeit import timeit def job(ip): #API url = "http://ip.taobao.com/service/getIpInfo.php?ip=%s" % (ip) # 根据url获取网页的内容, 并且解码为utf-8格式, 识别中文; text = urlopen(url).read().decode('utf-8') # 将获取的字符串类型转换为字典, 方便处理 d = json.loads(text)['data'] country = d['country'] city = d['city'] print("%s:"%(ip),country,city) @timeit def has_many_thread(): threads = [] ips = ['172.25.254.78', '8.8.8.8', '172.25.254.78', '8.8.8.8', '172.25.254.78', '8.8.8.8'] for ip in ips: # 实例化线程对象 t = threading.Thread(target=job,args=(ip,)) threads.append(t) t.start() [thread.join() for thread in threads] @timeit def has_no_thread(): ips = ['172.25.254.250', '8.8.8.8', '172.25.254.250', '8.8.8.8', '172.25.254.250', '8.8.8.8'] for ip in ips: job(ip) if __name__=='__main__': has_many_thread() has_no_thread()
注意: python3.2版本以后才可以使用;
from concurrent.futures import ThreadPoolExecutor import time def job(): print("this is a job") return 'hello' if __name__=='__main__': # 实例化对象, 线程池包含10个线程来处理任务; pool = ThreadPoolExecutor(max_workers=10) # 往线程池里面扔需要执行的任务, 返回一个对象,( _base.Future实例化出来的) f1 = pool.submit(job) f2 = pool.submit(job) # 判断任务是否执行结束 print(f1.done()) time.sleep(1) print(f2.done()) # 获取任务执行的结果 print(f1.result()) print(f2.result())
from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.error import HTTPError from urllib.request import urlopen from mytimeit import timeit URLS = ['http://httpbin.org', 'http://example.com/', 'https://api.github.com/'] * 10 def get_page(url,timeout=3): try: content = urlopen(url).read() return {'url':url,'len':len(content)} except HTTPError as e: return {'url':url,'len':0} @timeit def method_1(): # 方法1: submit提交任务 pool =ThreadPoolExecutor(max_workers=20) futuresobj = [pool.submit(get_page,url) for url in URLS] # 注意: 传递的时包含futures对象的序列, as_complete, 返回已经执行完任务的future对象, # 直到所有的future对应的任务执行完成, 循环结束; # for finish_fs in as_completed(futuresobj): # print(finish_fs.result()) for future in futuresobj: print(future.result()) @timeit def method_2(): # 方法2:通过map方式执行 pool = ThreadPoolExecutor(max_workers=20) for res in pool.map(get_page,URLS): print(res) method_2()
# 基于ssh用于连接远程服务器做操作:远程执行命令, 上传文件, 下载文件 import threading from concurrent.futures import ThreadPoolExecutor import paramiko from paramiko.ssh_exception import NoValidConnectionsError, AuthenticationException def connect(cmd, hostname, port=22, user='root'): # ssh root@172.25.254.250 # 创建一个ssh对象; client = paramiko.SSHClient() # 返回一个私钥对象 private_key = paramiko.RSAKey.from_private_key_file('id_rsa') # 2. 解决问题:如果之前没有;连接过的ip, 会出现 # Are you sure you want to continue connecting (yes/no)? yes # 自动选择yes client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: # 3. 连接服务器 client.connect(hostname=hostname, port=port, username=user, pkey=private_key ) # 4. 执行操作 stdin, stdout, stderr = client.exec_command(cmd) except NoValidConnectionsError as e: print("%s连接失败" %(hostname)) except AuthenticationException as e: print("%s密码错误" %(hostname)) else: # 5. 获取命令的执行结果; result = stdout.read().decode('utf-8') print("%s运行结果:" %(hostname), result) finally: # 6. 关闭连接 client.close() #******方法1:实例化对象实现********* def method_1(): threads = [] for count in range(254): host = '172.25.254.%s' % (count + 1) # print(host.center(50, '*')) t = threading.Thread(target=connect, args=('uname', host)) threads.append(t) t.start() # join方法, 等待所有的子线程执行结束; _ = [thread.join() for thread in threads] print("任务执行结束........") # ******方法2: 线程池只有50个线程处理所有的任务******** def method_2(): # 创建线程池对象 pool = ThreadPoolExecutor(max_workers=50) # 依次向线程池提交任务 for count in range(254): host = '172.25.254.%s' % (count + 1) pool.submit(connect, 'uname', host) method_2()
继承的方式实现:
import threading import paramiko from paramiko.ssh_exception import NoValidConnectionsError, AuthenticationException class IpThread(threading.Thread): def __init__(self,cmd,hostname,port=22,user='root'): super(IpThread, self).__init__() self.cmd=cmd self.hostname=hostname self.port=port self.user=user def run(self): client = paramiko.SSHClient() private_key = paramiko.RSAKey.from_private_key_file('id_rsa') client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: client.connect(hostname=self.hostname, port=self.port, username=self.user, pkey=private_key ) stdin, stdout, stderr = client.exec_command(self.cmd) except NoValidConnectionsError as e: print("%s连接失败" % (self.hostname)) except AuthenticationException as e: print("%s密码错误" % (self.hostname)) except TimeoutError as e: print("%s连接超时" % (self.hostname)) else: result = stdout.read().decode('utf-8') print('%s' % (self.hostname), result) finally: client.close() def use_thread(): threads = [] ips = ['172.25.254.%s' %(i) for i in range(1,254)] for ip in ips: t = IpThread(cmd='uname',hostname=ip) threads.append(t) t.start() [thread.join() for thread in threads] if __name__ == '__main__': use_thread()
import threading from queue import Queue import openpyxl import time def readwb(queue,sheetlist): for row in sheetlist: child = [cell.value for cell in row] for i, k in enumerate(child): if k == 'Garlic': child[i + 1] = 3.07 elif k == 'Lemon': child[i + 1] = 1.27 elif k == 'Celery': child[i + 1] = 1.19 else: continue queue.put(child) def save_to_excel(wbname, queue,sheetname='sheet1'): """ 将信息保存到excel表中; """ print("写入Excel[%s]中......." %(wbname)) # 打开excel表, 如果文件不存在, 自己实例化一个WorkBook对象 wb = openpyxl.Workbook() # 修改当前工作表的名称 sheet = wb.active # 修改工作表的名称 sheet.title = sheetname data=[queue.get()for i in range(queue.qsize())] for row, item in enumerate(data): # 0 [' Potatoes', 0.86, 21.6,18.58] for column, cellValue in enumerate(item): # 0 ' Potatoes' sheet.cell(row=row+1, column=column+1, value=cellValue) # 保存写入的信息 wb.save(filename=wbname) print("写入成功!") if __name__=='__main__': start_time = time.time() q=Queue() threades = [] wb = openpyxl.load_workbook('produceSales.xlsx') sheet = wb.active sheetlist=list(sheet.rows) for i in range(0,len(sheetlist),5000): t1 = threading.Thread(target=readwb,args=(q,sheetlist[i:i+5000])) t2 = threading.Thread(target=save_to_excel,args=('produceSales.xlsx',q)) threades.append(t1) threades.append(t2) t1.start() t2.start() [thread.join() for thread in threades] print("Success, 运行时间为%s" % (time.time() - start_time))
多线程运行效果:
不使用多线程:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。