当前位置:   article > 正文

rabbitmq工作模式以及python版demo_python rabbitmq demo

python rabbitmq demo

1. 基本概念

AMQP协议:  高级消息队列协议, 进程间传递异步消息的一个网络协议, rabbitmq是基于AMQP协议开发的.

大致工作流程: 生产者(Publisher) ---> 交换机(Exchange) ---> 队列(Queue) ---> 消费者(Consumer)

Broker: 代理, 由Exchange和Queue组成. 连接生产者消费者, 实现AMPQ协议中消息队列和路由功能的进程.

Virtual Host: 虚拟主机, 一个虚拟主机里可以有多个Exchange和Queue, 用于权限控制.

Exchange: 交换机, 接收生产者发送的消息, 并且根据routing_key把消息路由到指定的Queue中去.

Queue: 消息队列, 存储待消费的消息. 由headers和body组成, headers包含生产者添加的消息的各种属性参数, body是真正发送的数据内容.

Binding: 通过routing_key将Exchange与Queue绑定. routing_key不能使任意的字符串,一般用“.”分割开, 例如 “register.shanghai”, “register.beijing”, “register.#”正则时 *(星号)代表任意一个单词; #(hash)代表0个或者多个单词

Channel: 信道, 消费者与Broker通信的渠道, 建立在TCP连接上的虚拟连接. 一个TCP连接上可以建立好多信道, 减少系统开销提高性能.

2. 工作模式

概括:

1. 简单模式: 最简单的一对一.

2. 工作队列模式: 一对多. 一个生产者对应多个消费者, 但每条消息只能被其中的一个消费者消费.

轮询分发: 将消息轮流发给每个消费者, 一个消费者处理完才会发送下一个. 例: A消费第1,4,7...条消息, B消费第2,5,8...条消息, C消费第3,6,9...条消息.

公平分发: 只要有空闲的消费者就给发待处理的消息. 相对于轮询分发提高效率.

3. 发布/订阅模式: 一个生产者产生的消息, 可以同时被多个消费者消费. 生产者将消息发送给broker,由Exchange将消息转发到绑定在此交换机的每一个Queue中, 消费者监听自己的Queue进行消费.

4. 路由模式: 生产者将消息发送给broker, 由Exchange根据routing_key分发到不同的Queue中, 消费者也根据routing_key找到对应的Queue进行消费.

5. 主题模式: 在路由模式的基础上, routing_key支持正则匹配

6. RPC模式: 通过消息队列实现RPC功能, 客户端发送消息到消费队列, 服务端消费消息执行程序将结果再返回给客户端, 就是把结果发送消息到回调队列.

总结: 

简单模式, 工作队列模式 为一类: 不需要声明交换机就可以用, 更不需要指定交换机类型. 实际上是由消息代理事先声明好的空字符串的直连交换机. 新建的队列都会自动绑定到此交换机, routing_key与队列名相等.

发布/订阅模式, 路由模式, 主题模式 为一类: 需要声明交换机, 并且指定交换机类型. 

RPC模式 为一类: 双向生产消费模式.

3. python版demo

生产者

  1. import pika
  2. import json
  3. import datetime
  4. import time
  5. import random
  6. class TestPublisher(object):
  7. def __init__(self):
  8. self.username = 'sanford'
  9. self.password = '123456'
  10. self.host = 'localhost'
  11. self.port = 5672
  12. self.virtual_host = 'sanford_host'
  13. def publisher_00(self, msg_type, data):
  14. """
  15. 简单模式/工作队列模式
  16. """
  17. # 创建连接时的登录凭证
  18. credentials = pika.PlainCredentials(username=self.username, password=self.password)
  19. # 参数设置
  20. params = pika.ConnectionParameters(host=self.host,
  21. port=self.port,
  22. virtual_host=self.virtual_host,
  23. credentials=credentials)
  24. # 创建阻塞式连接
  25. connection = pika.BlockingConnection(params)
  26. # 创建信道
  27. channel = connection.channel()
  28. # 声明队列 队列持久化: durable=True, 服务重启后队列依然存在
  29. channel.queue_declare(queue='queue_00', durable=True)
  30. # 消息属性设置 消息持久化: delivery_mode=2
  31. properties = pika.BasicProperties(headers={'msg-type': msg_type},
  32. delivery_mode=2)
  33. # exchange为默认''时routing_key为queue
  34. channel.basic_publish(exchange='',
  35. routing_key='queue_00',
  36. body=json.dumps(data),
  37. properties=properties)
  38. connection.close()
  39. def publisher_01(self, msg_type, data):
  40. """
  41. 发布/订阅模式(fanout)
  42. """
  43. credentials = pika.PlainCredentials(username=self.username, password=self.password)
  44. params = pika.ConnectionParameters(host=self.host,
  45. port=self.port,
  46. virtual_host=self.virtual_host,
  47. credentials=credentials)
  48. connection = pika.BlockingConnection(params)
  49. channel = connection.channel()
  50. # 声明交换机指定类型 交换机持久化: durable=True, 服务重启后交换机依然存在
  51. channel.exchange_declare(exchange='exchange_01', exchange_type='fanout', durable=True)
  52. properties = pika.BasicProperties(headers={'msg-type': msg_type},
  53. delivery_mode=2)
  54. # 此模式不需要routing_key进行消费 可以设置为''
  55. channel.basic_publish(exchange='exchange_01',
  56. routing_key='',
  57. body=json.dumps(data),
  58. properties=properties)
  59. connection.close()
  60. def publisher_02(self, msg_type, data):
  61. """
  62. 路由模式(direct)
  63. """
  64. credentials = pika.PlainCredentials(username=self.username, password=self.password)
  65. params = pika.ConnectionParameters(host=self.host,
  66. port=self.port,
  67. virtual_host=self.virtual_host,
  68. credentials=credentials)
  69. connection = pika.BlockingConnection(params)
  70. channel = connection.channel()
  71. # 声明交换机指定类型 交换机持久化: durable=True, 服务重启后交换机依然存在
  72. channel.exchange_declare(exchange='exchange_02', exchange_type='direct', durable=True)
  73. properties = pika.BasicProperties(headers={'msg-type': msg_type},
  74. delivery_mode=2)
  75. # 指定routing_key
  76. channel.basic_publish(exchange='exchange_02',
  77. routing_key='routing_key_02',
  78. body=json.dumps(data),
  79. properties=properties)
  80. connection.close()
  81. def publisher_03(self, msg_type, data):
  82. """
  83. 主题模式(topic),实现分发,
  84. """
  85. credentials = pika.PlainCredentials(username=self.username, password=self.password)
  86. params = pika.ConnectionParameters(host=self.host,
  87. port=self.port,
  88. virtual_host=self.virtual_host,
  89. credentials=credentials)
  90. connection = pika.BlockingConnection(params)
  91. channel = connection.channel()
  92. # 声明交换机指定类型 交换机持久化: durable=True, 服务重启后交换机依然存在
  93. channel.exchange_declare(exchange='exchange_03', exchange_type='topic', durable=True)
  94. properties = pika.BasicProperties(headers={'msg-type': msg_type},
  95. delivery_mode=2)
  96. # 拼接routing_key
  97. channel.basic_publish(exchange='exchange_03',
  98. routing_key='routing_key.{0}'.format(msg_type),
  99. body=json.dumps(data),
  100. properties=properties)
  101. connection.close()
  102. if __name__ == '__main__':
  103. test_publisher = TestPublisher()
  104. # 简单模式/工作队列模式
  105. # msg_type_00 = 'msg_type_00'
  106. # for i in range(20):
  107. # time.sleep(1)
  108. # data_00 = {'id': i, 'name': 'jay', 'send_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
  109. # test_publisher.publisher_00(msg_type_00, data_00)
  110. # 发布/订阅模式(fanout)
  111. # msg_type_01 = 'msg_type_01'
  112. # for i in range(20):
  113. # time.sleep(1)
  114. # data_01 = {'id': i, 'name': 'jay', 'send_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
  115. # test_publisher.publisher_01(msg_type_01, data_01)
  116. # 路由模式(direct)
  117. # msg_type_02 = 'msg_type_02'
  118. # for i in range(20):
  119. # time.sleep(1)
  120. # data_02 = {'id': i, 'name': 'jay', 'send_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
  121. # test_publisher.publisher_02(msg_type_02, data_02)
  122. # 主题模式(topic),实现分发
  123. msg_type_list = ['msg_type_03.0', 'msg_type_03.1', 'msg_type_03.2']
  124. for i in range(20):
  125. msg_type_03 = random.choice(msg_type_list)
  126. time.sleep(1)
  127. data_03 = {'id': i, 'name': 'jay', 'send_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
  128. test_publisher.publisher_03(msg_type_03, data_03)

消费者

  1. import pika
  2. import json
  3. import time
  4. import random
  5. import datetime
  6. import sys
  7. class TestConsumer(object):
  8. def __init__(self):
  9. self.username = 'sanford'
  10. self.password = '123456'
  11. self.host = 'localhost'
  12. self.port = 5672
  13. self.virtual_host = 'sanford_host'
  14. @staticmethod
  15. def callback_00(channel, method, properties, body):
  16. """
  17. 回调函数
  18. """
  19. time.sleep(random.randint(1, 10))
  20. headers = properties.headers
  21. data = json.loads(body)
  22. data['end_time'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  23. print(headers, data)
  24. # 手动确认已经消费成功 当auto_ack=False时
  25. channel.basic_ack(delivery_tag=method.delivery_tag)
  26. def consumer_00(self):
  27. """
  28. 简单模式/工作队列模式
  29. """
  30. # 创建连接时的登录凭证
  31. credentials = pika.PlainCredentials(username=self.username, password=self.password)
  32. # 参数设置
  33. params = pika.ConnectionParameters(host=self.host,
  34. port=self.port,
  35. virtual_host=self.virtual_host,
  36. credentials=credentials)
  37. # 创建阻塞式连接
  38. connection = pika.BlockingConnection(params)
  39. # 创建信道
  40. channel = connection.channel()
  41. # 声明队列 队列持久化: durable=True, 服务重启后队列依然存在
  42. channel.queue_declare(queue='queue_00', durable=True)
  43. # 公平分发(没有这行时为轮询分发) prefetch_count=1如果消费者中有一条消息没处理完就不会继续给这个消费者继续发消息
  44. channel.basic_qos(prefetch_count=1)
  45. # auto_ack=True 自动确认已经消费成功
  46. channel.basic_consume(queue='queue_00',
  47. on_message_callback=self.callback_00)
  48. channel.start_consuming()
  49. def consumer_01(self):
  50. """
  51. 发布/订阅模式(fanout)
  52. """
  53. credentials = pika.PlainCredentials(username=self.username, password=self.password)
  54. params = pika.ConnectionParameters(host=self.host,
  55. port=self.port,
  56. virtual_host=self.virtual_host,
  57. credentials=credentials)
  58. connection = pika.BlockingConnection(params)
  59. channel = connection.channel()
  60. # 声明交换机指定类型 交换机持久化: durable=True, 服务重启后交换机依然存在
  61. channel.exchange_declare(exchange='exchange_01', exchange_type='fanout', durable=True)
  62. # 声明队列, queue为空字符串时会创建唯一的队列名. exclusive=True, 仅允许当前的连接访问
  63. result = channel.queue_declare(queue='', exclusive=True)
  64. queue_name = result.method.queue
  65. # 通过路由键将队列和交换器绑定, 此模式不需要routing_key进行消费 可以默认为None
  66. channel.queue_bind(exchange='exchange_01', queue=queue_name)
  67. channel.basic_consume(queue=queue_name,
  68. on_message_callback=self.callback_00)
  69. channel.start_consuming()
  70. def consumer_02(self):
  71. """
  72. 路由模式(direct)
  73. """
  74. credentials = pika.PlainCredentials(username=self.username, password=self.password)
  75. params = pika.ConnectionParameters(host=self.host,
  76. port=self.port,
  77. virtual_host=self.virtual_host,
  78. credentials=credentials)
  79. connection = pika.BlockingConnection(params)
  80. channel = connection.channel()
  81. # 声明交换机指定类型 交换机持久化: durable=True, 服务重启后交换机依然存在
  82. channel.exchange_declare(exchange='exchange_02', exchange_type='direct', durable=True)
  83. # 声明队列, queue为空字符串时会创建唯一的队列名. exclusive=True, 仅允许当前的连接访问
  84. result = channel.queue_declare(queue='', exclusive=True)
  85. queue_name = result.method.queue
  86. # 通过路由键将队列和交换器绑定, 此模式需要routing_key进行消费
  87. channel.queue_bind(exchange='exchange_02', queue=queue_name, routing_key='routing_key_02')
  88. channel.basic_consume(queue=queue_name,
  89. on_message_callback=self.callback_00)
  90. channel.start_consuming()
  91. def consumer_03_0(self):
  92. """
  93. 主题模式(topic),实现分发 routing_key.msg_type_03.0
  94. """
  95. credentials = pika.PlainCredentials(username=self.username, password=self.password)
  96. params = pika.ConnectionParameters(host=self.host,
  97. port=self.port,
  98. virtual_host=self.virtual_host,
  99. credentials=credentials)
  100. connection = pika.BlockingConnection(params)
  101. channel = connection.channel()
  102. # 声明交换机指定类型 交换机持久化: durable=True, 服务重启后交换机依然存在
  103. channel.exchange_declare(exchange='exchange_03', exchange_type='topic', durable=True)
  104. # 声明队列, queue为空字符串时会创建唯一的队列名. exclusive=True, 仅允许当前的连接访问
  105. result = channel.queue_declare(queue='', exclusive=True)
  106. queue_name = result.method.queue
  107. # 通过路由键将队列和交换器绑定, 此模式需要routing_key进行消费
  108. channel.queue_bind(exchange='exchange_03', queue=queue_name, routing_key='routing_key.msg_type_03.0')
  109. channel.basic_consume(queue=queue_name,
  110. on_message_callback=self.callback_00)
  111. channel.start_consuming()
  112. def consumer_03_1(self):
  113. """
  114. 主题模式(topic),实现分发 routing_key.msg_type_03.1
  115. """
  116. credentials = pika.PlainCredentials(username=self.username, password=self.password)
  117. params = pika.ConnectionParameters(host=self.host,
  118. port=self.port,
  119. virtual_host=self.virtual_host,
  120. credentials=credentials)
  121. connection = pika.BlockingConnection(params)
  122. channel = connection.channel()
  123. # 声明交换机指定类型 交换机持久化: durable=True, 服务重启后交换机依然存在
  124. channel.exchange_declare(exchange='exchange_03', exchange_type='topic', durable=True)
  125. # 声明队列, queue为空字符串时会创建唯一的队列名. exclusive=True, 仅允许当前的连接访问
  126. result = channel.queue_declare(queue='', exclusive=True)
  127. queue_name = result.method.queue
  128. # 通过路由键将队列和交换器绑定, 此模式需要routing_key进行消费
  129. channel.queue_bind(exchange='exchange_03', queue=queue_name, routing_key='routing_key.msg_type_03.1')
  130. channel.basic_consume(queue=queue_name,
  131. on_message_callback=self.callback_00)
  132. channel.start_consuming()
  133. def consumer_03_2(self):
  134. """
  135. 主题模式(topic),实现分发 routing_key.msg_type_03.#
  136. * (星号) 代表任意 一个单词
  137. # (hash) 0个或者多个单词
  138. """
  139. credentials = pika.PlainCredentials(username=self.username, password=self.password)
  140. params = pika.ConnectionParameters(host=self.host,
  141. port=self.port,
  142. virtual_host=self.virtual_host,
  143. credentials=credentials)
  144. connection = pika.BlockingConnection(params)
  145. channel = connection.channel()
  146. # 声明交换机指定类型 交换机持久化: durable=True, 服务重启后交换机依然存在
  147. channel.exchange_declare(exchange='exchange_03', exchange_type='topic', durable=True)
  148. # 声明队列, queue为空字符串时会创建唯一的队列名. exclusive=True, 仅允许当前的连接访问
  149. result = channel.queue_declare(queue='', exclusive=True)
  150. queue_name = result.method.queue
  151. # 通过路由键将队列和交换器绑定, 此模式需要routing_key进行消费
  152. channel.queue_bind(exchange='exchange_03', queue=queue_name, routing_key='routing_key.msg_type_03.#')
  153. channel.basic_consume(queue=queue_name,
  154. on_message_callback=self.callback_00)
  155. channel.start_consuming()
  156. if __name__ == '__main__':
  157. test_consumer = TestConsumer()
  158. # 简单模式/工作队列模式
  159. # test_consumer.consumer_00()
  160. # 发布/订阅模式(fanout)
  161. # test_consumer.consumer_01()
  162. # 路由模式(direct)
  163. # test_consumer.consumer_02()
  164. # 主题模式(topic),实现分发
  165. if sys.argv[1] == '0':
  166. test_consumer.consumer_03_0()
  167. elif sys.argv[1] == '1':
  168. test_consumer.consumer_03_1()
  169. elif sys.argv[1] == '2':
  170. test_consumer.consumer_03_2()

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/480968
推荐阅读
相关标签
  

闽ICP备14008679号