当前位置:   article > 正文

python 64式: 第32式、多进程框架cotyledon探究与性能测试_python forkedpdb

python forkedpdb

1 编写cotyledonServer.py

该文件用于在cotyledon多进程框架中监听消息队列并处理消息

  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # @Time : 11/28/17 3:07 PM
  4. # @Author : Aries
  5. # @Site :
  6. # @File : main.py
  7. # @Software: PyCharm
  8. import abc
  9. from datetime import datetime
  10. import multiprocessing
  11. import itertools
  12. import time
  13. import cotyledon
  14. from oslo_config import cfg
  15. from oslo_log import log
  16. import oslo_messaging
  17. import six
  18. '''
  19. 关键:
  20. 1
  21. cotyledon.Service(worker_id)
  22. 作用: 创建一个新的服务
  23. 参数: worker_id (int) – service实例的标示符
  24. 2 ServiceManager()
  25. 2.1 作用
  26. 类似于主进程,管理服务的生命周期。
  27. 控制子进程的生命周期,如果子进程意外死亡就重启他们。
  28. 每一个子进程ServiceWorker运行在一个服务的实例上。
  29. 一个应用必须创建一个ServiceManager类并且使用
  30. ServiceManager.run()做为和应用的主循环
  31. 样例:
  32. class cotyledon.ServiceManager(wait_interval=0.01, graceful_shutdown_timeout=60)
  33. 2.2 cotyledon.ServiceManager.add
  34. cotyledon.ServiceManager.add(service, workers=1, args=None, kwargs=None)
  35. 作用: 创建一个子进程来运行AgentService服务
  36. 参数:
  37. service (callable) – callable that return an instance of Service
  38. workers (int) – number of processes/workers for this service
  39. args (tuple) – additional positional arguments for this service
  40. kwargs (dict) – additional keywoard arguments for this service
  41. Returns:
  42. a service id
  43. 2.3 cotyledon.ServiceManager.run()
  44. 开启并监督服务工作者
  45. 这个方法将会开启和监督所有子进程,直到主进程被关闭了
  46. 参考:
  47. http://cotyledon.readthedocs.io/en/latest/api.html
  48. '''
  49. LOG = log.getLogger(__name__)
  50. # import sys
  51. # import pdb
  52. #
  53. # class ForkedPdb(pdb.Pdb):
  54. # def interaction(self, *args, **kwargs):
  55. # _stdin = sys.stdin
  56. # try:
  57. # sys.stdin = open('/dev/stdin')
  58. # pdb.Pdb.interaction(self, *args, **kwargs)
  59. # finally:
  60. # sys.stdin = _stdin
  61. def timeHelper(func):
  62. def wrapper(*args, **kwargs):
  63. try:
  64. start = datetime.now()
  65. info = "############ Begin task, start: %s ##########" % (
  66. str(start))
  67. LOG.info(info)
  68. func(*args, **kwargs)
  69. end = datetime.now()
  70. diff = end - start
  71. info = "##### End task, end: %s, cost time: %s #####" % (
  72. str(end), str(diff))
  73. LOG.info(info)
  74. except Exception as ex:
  75. info = "Exception type is %s, message is %s" % (ex.__class__.__name__, ex)
  76. LOG.error(info)
  77. return wrapper
  78. @six.add_metaclass(abc.ABCMeta)
  79. class BaseNotificationEndpoint(object):
  80. @abc.abstractmethod
  81. def sample(self, notifications):
  82. """"""
  83. class HyperHorseEndpoint(BaseNotificationEndpoint):
  84. def __init__(self):
  85. pass
  86. @timeHelper
  87. def sample(self, notifications):
  88. # 下面的停留的时间用于模拟处理消息的耗时,用于测试cotyledon是否真正做到并发
  89. time.sleep(1)
  90. # TODO(), add real process notification code
  91. info = "process ends at: {curTime}, notifications: {notifications}".format(
  92. notifications=notifications,
  93. curTime=datetime.now()
  94. )
  95. LOG.info(info)
  96. class AgentService(cotyledon.Service):
  97. def __init__(self, worker_id, conf):
  98. super(AgentService, self).__init__(worker_id)
  99. self.conf = conf
  100. self.transport = None
  101. self.targets = None
  102. self.endpoints = None
  103. self.listener = None
  104. self.rpc_server = None
  105. def getTransport(self):
  106. if not self.transport:
  107. try:
  108. self.transport = oslo_messaging.get_transport(
  109. self.conf
  110. )
  111. except Exception as ex:
  112. info = "get transport exception: {exception}, " \
  113. "message: {message}".format(exception=ex.__class__.__name__,
  114. message=ex)
  115. LOG.error(info)
  116. return self.transport
  117. def getEndpoints(self):
  118. if not self.endpoints:
  119. try:
  120. # TODO(), use entry_points to get real endpoints
  121. self.endpoints = [HyperHorseEndpoint()]
  122. except Exception as ex:
  123. info = "get endpoints exception: {exception}, " \
  124. "message: {message}".format(exception=ex.__class__.__name__,
  125. message=ex)
  126. LOG.error(info)
  127. return self.endpoints
  128. def getTargets(self):
  129. if not self.targets:
  130. try:
  131. # ForkedPdb().set_trace()
  132. # topics = self.conf.oslo_messaging_notifications.topics
  133. topics = self.conf.DEFAULT.topics
  134. exchange = self.conf.DEFAULT.exchange
  135. targets = []
  136. for topic in topics:
  137. target = oslo_messaging.Target(
  138. topic=topic,
  139. exchange=exchange
  140. )
  141. targets.append(target)
  142. self.targets = targets
  143. except Exception as ex:
  144. info = "get targets exception: {exception}, " \
  145. "message: {message}".format(exception=ex.__class__.__name__,
  146. message=ex)
  147. LOG.error(info)
  148. return self.targets
  149. '''
  150. 初始化监听器:
  151. 1 初始化transport,入参是消息队列连接串的url或者oslo.ConfigOpt对象
  152. 2 初始化endpoints,即最终处理消息的对象(如果是oslo_messaging.Notifier.sample调用,则
  153. 该处理消息的类中必须包含上sample(notifications)方法 )
  154. 3 初始化targets,即监听何种消息,每个target(包含topic,exchange)
  155. 4 根据上述信息获取批量监听器
  156. '''
  157. def startListener(self):
  158. transport = self.getTransport()
  159. endpoints = self.getEndpoints()
  160. targets = self.getTargets()
  161. # 可以设置批量处理大小
  162. self.listener = oslo_messaging.get_batch_notification_listener(
  163. transport,
  164. targets,
  165. endpoints,
  166. executor="threading",
  167. batch_size=1,
  168. batch_timeout=None
  169. )
  170. self.listener.start(override_pool_size=1)
  171. # 等同于开启了一个线程,所以以线程的形式去做,while
  172. def run(self):
  173. try:
  174. self.startListener()
  175. LOG.info("start listener finished")
  176. except Exception as ex:
  177. info = "run exception: {exception}, " \
  178. "message: {message}".format(exception=ex.__class__.__name__,
  179. message=ex)
  180. LOG.error(info)
  181. DEFAULT_OPTS = [
  182. cfg.StrOpt('exchange', default='abc', help='exchange name'),
  183. cfg.ListOpt('topics', default=['hyper'], help='topic list')
  184. ]
  185. def getOpts():
  186. return [
  187. ('DEFAULT', DEFAULT_OPTS),
  188. ]
  189. def prepareService(configFile=None):
  190. conf= cfg.ConfigOpts()
  191. log.register_options(conf)
  192. log.set_defaults(default_log_levels=conf.default_log_levels)
  193. for group, options in getOpts():
  194. conf.register_opts(
  195. group=group if group else 'DEFAULT', opts=list(options))
  196. conf(None,
  197. project='hyperhorse',
  198. validate_default_values=False,
  199. default_config_files=[configFile])
  200. log.setup(conf, 'hyperhorse')
  201. return conf
  202. def main():
  203. conf = prepareService(configFile="hyperhorse.conf")
  204. sm = cotyledon.ServiceManager()
  205. # import pdb;pdb.set_trace()
  206. cpuNum = multiprocessing.cpu_count() or 1
  207. # 可以根据自己的实际需要来调整worker个数
  208. workerNum = min(16, cpuNum)
  209. sm.add(AgentService, workers=workerNum, args=(conf,))
  210. sm.run()
  211. if __name__ == "__main__":
  212. main()
  213. print "finish"

 

2 编写cotyledonClient.py

用于发送消息给cotyledonServer

  1. # -*- encoding: utf-8 -*-
  2. import argparse
  3. from datetime import datetime
  4. import sys
  5. from oslo_config import cfg
  6. from oslo_log import log
  7. import oslo_messaging
  8. LOG = log.getLogger(__name__)
  9. class Producer(object):
  10. def __init__(self, conf=None, url=None, topics=None, driver="messagingv2"):
  11. self.conf = conf
  12. self.url = url
  13. self.topics = topics or self.conf.DEFAULT.topics
  14. self.transport = oslo_messaging.get_transport(self.conf)
  15. self.notifier = oslo_messaging.Notifier(
  16. self.transport,
  17. topics=self.topics,
  18. driver=driver
  19. )
  20. def publish(self, payloads):
  21. try:
  22. self.notifier.sample(
  23. {},
  24. 'hyperhorse.message',
  25. {"samples": payloads}
  26. )
  27. info = "send m
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/236094
推荐阅读
相关标签
  

闽ICP备14008679号