赞
踩
本文内容:
工作中有个常用的场景,比如现在需要下载10W张图片,我们不可能写个for循环一张一张的下载吧,又或者是我们做个简单的HTTP压力测试肯定是要使用多个,进程或者线程去做(每个请求handler,会有一个参数(所有的参数生成一个队列))然后把handler和队列map到Pool里面。肯定要用多线程或者是多进程,然后把这100W的队列丢给线程池或者进程池去处理在python中multiprocessing Pool进程池,以及multiprocessing.dummy非常好用,一般:
前者是多个进程,后者使用的是线程,之所以dummy(中文意思“假的”)
下面给出一个简单的http压力测试示例:
# _*_ coding:utf-8 _*_
"""
This file a sample demo to do http stress test
"""
import requests
import time
from multiprocessing.dummy import Pool as ThreadPool
import urllib
def get_ret_from_http(url):
"""cited from https://stackoverflow.com/questions/645312/what-is-the-quickest-way-to-http-get-in-python
"""
ret = requests.get(url)
print ret.content
# eg. result: {"error":false,"resultMap":{"check_ret":1},"success":true}
def multi_process_stress_test():
"""
start up 4 thread to issue 1000 http requests to server
and test consume time
:return:
"""
start = time.time()
# 实际中url带参数的一般使用下面的make_url函数生成,这里示例就不用(前面写的现在懒得改了)
url = """http://127.0.0.1:9325/shortvideo/checkBlack?url=http%3A%2F%2Fzbasecapture.bs2.yy.com%2F42269159_1499248536403_3.jpg&serial=abcdddddddd"""
# generate task queue list
lst_url = [url, url1]*50
# use 5 threads
pool = ThreadPool(5)
# task and handles to pool
ret = pool.map(get_ret_from_http, lst_url)
pool.close()
pool.join()
print 'time consume %s' % (time.time() - start)
def make_url():
"""
generate url with parameter
https://xy.com/index.php?
url=http%3A//xy.xxx.com/22.jpg&SecretId=xy_123_move
cited from
https://stackoverflow.com/questions/2506379/add-params-to-given-url-in-python
https://github.com/gruns/furl a good util for url operator
:return:
"""
para = {"SecretId": "xy_123_move", "url": "http://xy.xxx.com/22.jpg"}
print urllib.urlencode(para)
#url=http%3A%2F%2Fxy.xxx.com%2F22.jpg&SecretId=xy_123_move
base_url = 'xy.com/index.php'
return 'https://%s?%s' % (base_url, '&'.join('%s=%s' % (k, urllib.quote(str(v))) for k, v in para.iteritems()))
if __name__ == '__main__':
# get_ret_from_http()
multi_process_stress_test()
# print make_url()
pass
下面在给出另一个简单的示例,handler函数每次睡眠随机的秒数(根据指定的参数),我们可以选择使用进程或者是线程来完成队列中所有的任务(一般CPU密集型的选择用多进程,IO密集型的可以选择多线程)
# _*_ coding:utf-8 _*_
"""
This file is about thread(dummy)/process pool
"""
from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool
import logging
from time import sleep, time
from random import randrange
logging.basicConfig(level=logging.DEBUG,
format='%(levelname)s %(asctime)s %(processName)s %(message)s',
datefmt='%Y-%m-%d %I:%M:%S')
def handler(sec):
logging.debug('now I will sleep %s S', sec)
sleep(sec)
def get_pool(b_dummy=True, num=4):
"""
if b_dummy is True then get ThreadPool, or get process pool
:param b_dummy: dummy thread Pool or Process pool
:param num: thread or process num
:return: pool object
"""
if b_dummy:
pool = ThreadPool(num)
else:
pool = ProcessPool(num)
return pool
def test_dummy_thread_pool():
start_time = time()
# generate task queue parameters lists
lst_sleep_sec = [randrange(3, 10) for i in xrange(10)]
pool = get_pool(b_dummy=False)
results = pool.map(handler, lst_sleep_sec)
logging.debug(results)
pool.close()
pool.join()
logging.debug('time consume %s', time() - start_time)
pass
if __name__ == '__main__':
test_dummy_thread_pool()
pass
工作中使用的语言比较多写过C++,java, 部分html+js, python的.由于用到语言的间歇性,比如还几个月没有使用python了许多技巧就忘记了,于是我把一些常用的python代码分类项目在本人的github中,当实际中用到某一方法的时候就把常用的方法放到一个文件中方便查询。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。