赞
踩
1 编写cotyledonServer.py
该文件用于在cotyledon多进程框架中监听消息队列并处理消息
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # @Time : 11/28/17 3:07 PM
- # @Author : Aries
- # @Site :
- # @File : main.py
- # @Software: PyCharm
-
- import abc
- from datetime import datetime
- import multiprocessing
- import itertools
- import time
-
- import cotyledon
- from oslo_config import cfg
- from oslo_log import log
- import oslo_messaging
- import six
-
- '''
- 关键:
- 1
- cotyledon.Service(worker_id)
- 作用: 创建一个新的服务
- 参数: worker_id (int) – service实例的标示符
- 2 ServiceManager()
- 2.1 作用
- 类似于主进程,管理服务的生命周期。
- 控制子进程的生命周期,如果子进程意外死亡就重启他们。
- 每一个子进程ServiceWorker运行在一个服务的实例上。
- 一个应用必须创建一个ServiceManager类并且使用
- ServiceManager.run()做为和应用的主循环
- 样例:
- class cotyledon.ServiceManager(wait_interval=0.01, graceful_shutdown_timeout=60)
-
- 2.2 cotyledon.ServiceManager.add
- cotyledon.ServiceManager.add(service, workers=1, args=None, kwargs=None)
- 作用: 创建一个子进程来运行AgentService服务
- 参数:
- service (callable) – callable that return an instance of Service
- workers (int) – number of processes/workers for this service
- args (tuple) – additional positional arguments for this service
- kwargs (dict) – additional keywoard arguments for this service
- Returns:
- a service id
-
- 2.3 cotyledon.ServiceManager.run()
- 开启并监督服务工作者
- 这个方法将会开启和监督所有子进程,直到主进程被关闭了
-
-
- 参考:
- http://cotyledon.readthedocs.io/en/latest/api.html
- '''
-
-
- LOG = log.getLogger(__name__)
-
-
- # import sys
- # import pdb
- #
- # class ForkedPdb(pdb.Pdb):
- # def interaction(self, *args, **kwargs):
- # _stdin = sys.stdin
- # try:
- # sys.stdin = open('/dev/stdin')
- # pdb.Pdb.interaction(self, *args, **kwargs)
- # finally:
- # sys.stdin = _stdin
-
-
- def timeHelper(func):
- def wrapper(*args, **kwargs):
- try:
- start = datetime.now()
- info = "############ Begin task, start: %s ##########" % (
- str(start))
- LOG.info(info)
- func(*args, **kwargs)
- end = datetime.now()
- diff = end - start
- info = "##### End task, end: %s, cost time: %s #####" % (
- str(end), str(diff))
- LOG.info(info)
- except Exception as ex:
- info = "Exception type is %s, message is %s" % (ex.__class__.__name__, ex)
- LOG.error(info)
-
- return wrapper
-
-
- @six.add_metaclass(abc.ABCMeta)
- class BaseNotificationEndpoint(object):
-
- @abc.abstractmethod
- def sample(self, notifications):
- """"""
-
-
- class HyperHorseEndpoint(BaseNotificationEndpoint):
-
- def __init__(self):
- pass
-
- @timeHelper
- def sample(self, notifications):
- # 下面的停留的时间用于模拟处理消息的耗时,用于测试cotyledon是否真正做到并发
- time.sleep(1)
- # TODO(), add real process notification code
- info = "process ends at: {curTime}, notifications: {notifications}".format(
- notifications=notifications,
- curTime=datetime.now()
- )
- LOG.info(info)
-
-
-
- class AgentService(cotyledon.Service):
-
- def __init__(self, worker_id, conf):
- super(AgentService, self).__init__(worker_id)
- self.conf = conf
- self.transport = None
- self.targets = None
- self.endpoints = None
- self.listener = None
- self.rpc_server = None
-
- def getTransport(self):
- if not self.transport:
- try:
- self.transport = oslo_messaging.get_transport(
- self.conf
- )
- except Exception as ex:
- info = "get transport exception: {exception}, " \
- "message: {message}".format(exception=ex.__class__.__name__,
- message=ex)
- LOG.error(info)
- return self.transport
-
- def getEndpoints(self):
- if not self.endpoints:
- try:
- # TODO(), use entry_points to get real endpoints
- self.endpoints = [HyperHorseEndpoint()]
- except Exception as ex:
- info = "get endpoints exception: {exception}, " \
- "message: {message}".format(exception=ex.__class__.__name__,
- message=ex)
- LOG.error(info)
- return self.endpoints
-
- def getTargets(self):
- if not self.targets:
- try:
- # ForkedPdb().set_trace()
- # topics = self.conf.oslo_messaging_notifications.topics
- topics = self.conf.DEFAULT.topics
- exchange = self.conf.DEFAULT.exchange
- targets = []
- for topic in topics:
- target = oslo_messaging.Target(
- topic=topic,
- exchange=exchange
- )
- targets.append(target)
- self.targets = targets
- except Exception as ex:
- info = "get targets exception: {exception}, " \
- "message: {message}".format(exception=ex.__class__.__name__,
- message=ex)
- LOG.error(info)
- return self.targets
-
-
-
- '''
- 初始化监听器:
- 1 初始化transport,入参是消息队列连接串的url或者oslo.ConfigOpt对象
- 2 初始化endpoints,即最终处理消息的对象(如果是oslo_messaging.Notifier.sample调用,则
- 该处理消息的类中必须包含上sample(notifications)方法 )
- 3 初始化targets,即监听何种消息,每个target(包含topic,exchange)
- 4 根据上述信息获取批量监听器
- '''
- def startListener(self):
- transport = self.getTransport()
- endpoints = self.getEndpoints()
- targets = self.getTargets()
- # 可以设置批量处理大小
- self.listener = oslo_messaging.get_batch_notification_listener(
- transport,
- targets,
- endpoints,
- executor="threading",
- batch_size=1,
- batch_timeout=None
- )
- self.listener.start(override_pool_size=1)
-
- # 等同于开启了一个线程,所以以线程的形式去做,while
- def run(self):
- try:
- self.startListener()
- LOG.info("start listener finished")
- except Exception as ex:
- info = "run exception: {exception}, " \
- "message: {message}".format(exception=ex.__class__.__name__,
- message=ex)
- LOG.error(info)
-
- DEFAULT_OPTS = [
- cfg.StrOpt('exchange', default='abc', help='exchange name'),
- cfg.ListOpt('topics', default=['hyper'], help='topic list')
- ]
-
- def getOpts():
- return [
- ('DEFAULT', DEFAULT_OPTS),
- ]
-
-
- def prepareService(configFile=None):
- conf= cfg.ConfigOpts()
- log.register_options(conf)
- log.set_defaults(default_log_levels=conf.default_log_levels)
-
- for group, options in getOpts():
- conf.register_opts(
- group=group if group else 'DEFAULT', opts=list(options))
- conf(None,
- project='hyperhorse',
- validate_default_values=False,
- default_config_files=[configFile])
- log.setup(conf, 'hyperhorse')
- return conf
-
-
- def main():
- conf = prepareService(configFile="hyperhorse.conf")
- sm = cotyledon.ServiceManager()
- # import pdb;pdb.set_trace()
- cpuNum = multiprocessing.cpu_count() or 1
- # 可以根据自己的实际需要来调整worker个数
- workerNum = min(16, cpuNum)
- sm.add(AgentService, workers=workerNum, args=(conf,))
- sm.run()
-
-
- if __name__ == "__main__":
- main()
- print "finish"
2 编写cotyledonClient.py
用于发送消息给cotyledonServer
- # -*- encoding: utf-8 -*-
-
- import argparse
- from datetime import datetime
- import sys
-
- from oslo_config import cfg
- from oslo_log import log
- import oslo_messaging
-
-
- LOG = log.getLogger(__name__)
-
-
- class Producer(object):
- def __init__(self, conf=None, url=None, topics=None, driver="messagingv2"):
- self.conf = conf
- self.url = url
- self.topics = topics or self.conf.DEFAULT.topics
- self.transport = oslo_messaging.get_transport(self.conf)
- self.notifier = oslo_messaging.Notifier(
- self.transport,
- topics=self.topics,
- driver=driver
- )
-
- def publish(self, payloads):
- try:
- self.notifier.sample(
- {},
- 'hyperhorse.message',
- {"samples": payloads}
- )
- info = "send m
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。