当前位置:   article > 正文

Python-RabbitMQ-RPC(非阻塞版)

python rabbitmq 不阻塞

服务器端:rpc_server.py

 

  1. import pika,time
  2. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  3. channel = connection.channel()
  4. channel.queue_declare(queue='rpc_queue')
  5. def fib(n):
  6. if n == 0:
  7. return 0
  8. elif n == 1:
  9. return 1
  10. else:
  11. return fib(n-1) + fib(n -2)
  12. def on_request(ch, method, props, body):
  13. n = int(body)
  14. print("[.] fib(%s)" % n)
  15. response = fib(n)#斐波那契的执行结果赋值给reponse
  16. #再把得到的消息发回给客户端
  17. ch.basic_publish(exchange='',
  18. routing_key=props.reply_to,
  19. properties = pika.BasicProperties(
  20. correlation_id= \
  21. props.correlation_id
  22. ),
  23. body =str(response))
  24. ch.basic_ack(delivery_tag=method.delivery_tag)#确保消息被消费,代表任务完成
  25. #channel.basic_qos(prefetch_count=1)
  26. channel.basic_consume(on_request,
  27. queue='rpc_queue')
  28. print(" [x] Awaiting RPC request")
  29. channel.start_consuming()

 

客户端:rpc_client.py

  1. import pika,sys,uuid
  2. import time
  3. class FibonacciRpcClient(object):
  4. def __init__(self):
  5. self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  6. self.channel = self.connection.channel()
  7. result = self.channel.queue_declare(exclusive=True)
  8. self.callback_queue = result.method.queue#获取queue名字
  9. self.channel.basic_consume(self.on_response,#只要收到就调用on_response()
  10. no_ack=True,
  11. queue=self.callback_queue
  12. )
  13. def on_response(self, ch, method, props, body):
  14. if self.corr_id == props.correlation_id:#判断服务器端corr_id和本地corr_id相等,才往下走
  15. self.response = body#response收到body的消息表示response不为空
  16. def call(self, n):
  17. self.response = None
  18. self.corr_id = str(uuid.uuid4())
  19. self.channel.basic_publish(exchange='',
  20. routing_key='rpc_queue',
  21. properties=pika.BasicProperties(
  22. reply_to=self.callback_queue,#指定返回到那个queue
  23. correlation_id=self.corr_id,
  24. ),
  25. body=str(n))#传字符串,把30传进来
  26. while self.response is None:
  27. #收到消息,就会触发on_response(),没消息就继续往下走循环
  28. self.connection.process_data_events()#非阻塞版的start_consuming
  29. print("no msg...")#只要走到这,就相当于没消息
  30. time.sleep(0.5)
  31. return int(self.response)
  32. fibonacci_rpc = FibonacciRpcClient()
  33. print(" [x] Requesting fib(30)")
  34. response = fibonacci_rpc.call(8)
  35. print(" [.] Got %r" % response)

 

转载于:https://www.cnblogs.com/fuyuteng/p/9263764.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/603471
推荐阅读
相关标签
  

闽ICP备14008679号