赞
踩
pip install pika
- import pika
- import time
-
- # 用户名和密码
- user_info = pika.PlainCredentials('admin','admin')
-
- # 连接服务器上的rabbitMQ服务
- connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', user_info))
- # connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
-
-
- # 创建一个channel
- channel = connection.channel()
-
- # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
- channel.queue_declare(queue='pythone.test')
-
- # 推送消息到队列
- # exchange:当前是一个简单模式,所以这里设置为空字符串就可以了。
- # routing_key:指定消息要发送到哪个queue。
- # body:指定要发送的消息。
- channel.basic_publish(exchange='',routing_key='pythone.test',body='{}'.format('test xxx'))
-
- # 关闭连接
- connection.close()

执行后我们进入rabbitMQ网页端后台查看pythone.test 队列已经被创建
并且我们执行了三次,此处产生3条数据未被消费,还被压在队列中。
查看队列内消息列表
我们改造一下,将推送消息 放到方法中。
- import pika
- import time
-
- # 用户名和密码
- user_info = pika.PlainCredentials('admin','admin')
-
- # 连接服务器上的rabbitMQ服务
- connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', user_info))
- # connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
-
-
- # 创建一个channel
- channel = connection.channel()
-
-
- # 生产者方法
- def product():
- print("进入生产者方法!")
- # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
- channel.queue_declare(queue='pythone.test')
-
- # 推送消息到队列
- # exchange:当前是一个简单模式,所以这里设置为空字符串就可以了。
- # routing_key:指定消息要发送到哪个queue。
- # body:指定要发送的消息。
- channel.basic_publish(exchange='',routing_key='pythone.test',body='{}'.format('test xxx'))
-
- # 关闭连接
- connection.close()
-
- if __name__ == '__main__':
- start_time = time.time() # 程序开始时间
- print("========start=========|"+str(start_time))
-
- product()
-
- end_time = time.time() # 程序结束时间
- print("========end===========|"+str(end_time))

- # 消费者方法
- def consumer():
- print("进入消费者方法!")
- # 消费队列内的消息
- # queue:接收指定queue的消息
- # auto_ack:指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息
- # on_message_callback:设置收到消息的回调函数
- channel.basic_consume(queue='pythone.test', auto_ack=True, on_message_callback=mq_consumer_callback)
-
- # 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
- channel.start_consuming()
-
-
- # 消费者收到消息调用的回调函数
- # channel: 包含channel的一切属性和方法
- # method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
- # properties: basic_publish 通过 properties 传入的参数
- # body: basic_publish发送的消息
- def mq_consumer_callback(ch, method, properties, body):
- print('消费者收到:{}'.format(body))

- import pika
- import time
-
- # 用户名和密码
- user_info = pika.PlainCredentials('admin','admin')
-
- # 连接服务器上的rabbitMQ服务
- connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', user_info))
- # connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
-
-
- # 创建一个channel
- channel = connection.channel()
-
-
- # 生产者方法
- def product():
- print("进入生产者方法!")
- # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
- channel.queue_declare(queue='pythone.test')
-
- # 推送消息到队列
- # exchange:当前是一个简单模式,所以这里设置为空字符串就可以了。
- # routing_key:指定消息要发送到哪个queue。
- # body:指定要发送的消息。
- channel.basic_publish(exchange='',routing_key='pythone.test',body='{}'.format('test xxx'))
-
- # 关闭连接
- connection.close()
-
- # 消费者方法
- def consumer():
- print("进入消费者方法!")
- # 消费队列内的消息
- # queue:接收指定queue的消息
- # auto_ack:指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息
- # on_message_callback:设置收到消息的回调函数
- channel.basic_consume(queue='pythone.test', auto_ack=True, on_message_callback=mq_consumer_callback)
-
- # 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
- channel.start_consuming()
-
-
- # 消费者收到消息调用的回调函数
- # channel: 包含channel的一切属性和方法
- # method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
- # properties: basic_publish 通过 properties 传入的参数
- # body: basic_publish发送的消息
- def mq_consumer_callback(ch, method, properties, body):
- print('消费者收到:{}'.format(body))
-
-
-
-
- if __name__ == '__main__':
- start_time = time.time() # 程序开始时间
- print("========start=========|"+str(start_time))
-
- # product()
- consumer()
-
- end_time = time.time() # 程序结束时间
- print("========end===========|"+str(end_time))

我们执行3次product方法,生产3条数据到 队列。
再执行consumer方法,对队列内数据进行消费。
可以看见控制台打印如下:
再查看rabbitMQ网页后台,发现消息已经被正常消费
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。