当前位置:   article > 正文

Python 简单使用 RabbitMQ_python rabbitmq

python rabbitmq

一、安装

pip install pika

二、推送消息到队列中

执行pythone方法

  1. import pika
  2. import time
  3. # 用户名和密码
  4. user_info = pika.PlainCredentials('admin','admin')
  5. # 连接服务器上的rabbitMQ服务
  6. connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', user_info))
  7. # connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  8. # 创建一个channel
  9. channel = connection.channel()
  10. # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
  11. channel.queue_declare(queue='pythone.test')
  12. # 推送消息到队列
  13. # exchange:当前是一个简单模式,所以这里设置为空字符串就可以了。
  14. # routing_key:指定消息要发送到哪个queue。
  15. # body:指定要发送的消息。
  16. channel.basic_publish(exchange='',routing_key='pythone.test',body='{}'.format('test xxx'))
  17. # 关闭连接
  18. connection.close()

查看rabbitMQ网页后台

执行后我们进入rabbitMQ网页端后台查看pythone.test 队列已经被创建

并且我们执行了三次,此处产生3条数据未被消费,还被压在队列中。

查看队列内消息列表

我们改造一下,将推送消息 放到方法中。

三、封装成生产者、消费者方法

生产者product:

  1. import pika
  2. import time
  3. # 用户名和密码
  4. user_info = pika.PlainCredentials('admin','admin')
  5. # 连接服务器上的rabbitMQ服务
  6. connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', user_info))
  7. # connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  8. # 创建一个channel
  9. channel = connection.channel()
  10. # 生产者方法
  11. def product():
  12. print("进入生产者方法!")
  13. # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
  14. channel.queue_declare(queue='pythone.test')
  15. # 推送消息到队列
  16. # exchange:当前是一个简单模式,所以这里设置为空字符串就可以了。
  17. # routing_key:指定消息要发送到哪个queue。
  18. # body:指定要发送的消息。
  19. channel.basic_publish(exchange='',routing_key='pythone.test',body='{}'.format('test xxx'))
  20. # 关闭连接
  21. connection.close()
  22. if __name__ == '__main__':
  23. start_time = time.time() # 程序开始时间
  24. print("========start=========|"+str(start_time))
  25. product()
  26. end_time = time.time() # 程序结束时间
  27. print("========end===========|"+str(end_time))

消费者consumer:

  1. # 消费者方法
  2. def consumer():
  3. print("进入消费者方法!")
  4. # 消费队列内的消息
  5. # queue:接收指定queue的消息
  6. # auto_ack:指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息
  7. # on_message_callback:设置收到消息的回调函数
  8. channel.basic_consume(queue='pythone.test', auto_ack=True, on_message_callback=mq_consumer_callback)
  9. # 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
  10. channel.start_consuming()
  11. # 消费者收到消息调用的回调函数
  12. # channel: 包含channel的一切属性和方法
  13. # method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
  14. # properties: basic_publish 通过 properties 传入的参数
  15. # body: basic_publish发送的消息
  16. def mq_consumer_callback(ch, method, properties, body):
  17. print('消费者收到:{}'.format(body))

TestRabbitMQ.py

  1. import pika
  2. import time
  3. # 用户名和密码
  4. user_info = pika.PlainCredentials('admin','admin')
  5. # 连接服务器上的rabbitMQ服务
  6. connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', user_info))
  7. # connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  8. # 创建一个channel
  9. channel = connection.channel()
  10. # 生产者方法
  11. def product():
  12. print("进入生产者方法!")
  13. # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
  14. channel.queue_declare(queue='pythone.test')
  15. # 推送消息到队列
  16. # exchange:当前是一个简单模式,所以这里设置为空字符串就可以了。
  17. # routing_key:指定消息要发送到哪个queue。
  18. # body:指定要发送的消息。
  19. channel.basic_publish(exchange='',routing_key='pythone.test',body='{}'.format('test xxx'))
  20. # 关闭连接
  21. connection.close()
  22. # 消费者方法
  23. def consumer():
  24. print("进入消费者方法!")
  25. # 消费队列内的消息
  26. # queue:接收指定queue的消息
  27. # auto_ack:指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息
  28. # on_message_callback:设置收到消息的回调函数
  29. channel.basic_consume(queue='pythone.test', auto_ack=True, on_message_callback=mq_consumer_callback)
  30. # 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
  31. channel.start_consuming()
  32. # 消费者收到消息调用的回调函数
  33. # channel: 包含channel的一切属性和方法
  34. # method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
  35. # properties: basic_publish 通过 properties 传入的参数
  36. # body: basic_publish发送的消息
  37. def mq_consumer_callback(ch, method, properties, body):
  38. print('消费者收到:{}'.format(body))
  39. if __name__ == '__main__':
  40. start_time = time.time() # 程序开始时间
  41. print("========start=========|"+str(start_time))
  42. # product()
  43. consumer()
  44. end_time = time.time() # 程序结束时间
  45. print("========end===========|"+str(end_time))

四、测试验证

我们执行3次product方法,生产3条数据到 队列。

再执行consumer方法,对队列内数据进行消费。

可以看见控制台打印如下:

再查看rabbitMQ网页后台,发现消息已经被正常消费

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/719059
推荐阅读
相关标签
  

闽ICP备14008679号