赞
踩
安装
# 由于rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang
apt install erlang-nox
# 安装RabbitMQ
apt install rabbitmq-server
添加远程访问的用户
# 1. 添加用户
rabbitmqctl add_user ant 123456
# 2. 设置ant的角色信息,这里设置为超级管理员管理角色(可登陆web管理界面,可查看所有的信息,并且可以对用户,策略(policy)进行操作)
rabbitmqctl set_user_tags ant administrator
开启远程访问
如果要是用web管理界面需要执行以下命令
rabbitmq-plugins enable rabbitmq_management
开启远程访问
rabbitmqctl set_permissions -p "/" ant "." "." ".*"
web管理界面连接地址
http://192.168.1.42:15672
RabbitMQ默认开启的端口号port:5672
RabbitMQ常用命令
rabbitmq-server start
rabbitmq-server stop ## 不行就是试试下面的stop
rabbitmq-server restart
rabbitmqctl status
rabbitmqctl stop
python连接到RabbitMQ
import pika
# 连接rabbitmq
auth = pika.PlainCredentials('ant','123456')
connect = pika.BlockingConnection(pika.ConnectionParameters("ip", port=5672, virtual_host='/', credentials=auth))
channel = connect.channel()
# 创建队列
channel.queue_declare(queue="hello")
channel.basic_publish(exchange='', routing_key="hello", body='hello')
# 向指定队列插入数据
channel.basic_publish(exchange='', # 简单模式
routing_key="hello", # 指定队列
body='hello'
)
生产者消费者模式
简单模式
生产者
import pika
# 连接rabbitmq
auth = pika.PlainCredentials(username,password)
connect = pika.BlockingConnection(pika.ConnectionParameters("ip", port=5672, virtual_host='/', credentials=auth))
channel = connect.channel()
# 创建队列
channel.queue_declare(queue="hello",durable=False) # durable持久化参数, 默认为False,如果需要持久化存储改为True即可
# 向指定队列插入数据
channel.basic_publish(exchange='', # 简单模式
routing_key="hello", # 指定队列
body='tt', # 插入的数据
# 持久化存储,需要创建的队列为持久化队列 , 即durable=True时,需要带以下代码
#properties=pika.BasicProperties(delivery_mode=2)
)
消费者
import pika
auth = pika.PlainCredentials('ant','123456')
connect = pika.BlockingConnection(pika.ConnectionParameters("ip", port=5672, virtual_host='/', credentials=auth))
channel = connect.channel()
channel.queue_declare(queue='hello', durable=False)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
#True默认应答取数据的时候就把数据删除了
#False取数据之后不会删除数据
#如果要删除数据需要在回调函数处理完成之后添加 ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
交换机模式
发布订阅
生产者
import pika
auth = pika.PlainCredentials('ant','123456')
connect = pika.BlockingConnection(pika.ConnectionParameters("ip", port=5672, virtual_host='/', credentials=auth))
channel = connect.channel()
# print(channel)
# 创建交换机, exchange_type类型fanout:发布订阅模式, direct:关键字模式, topic : 通配符模式
change = channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 插入数据
channel.basic_publish(exchange='logs',
routing_key='',
body=b'fkoepwkp'
)
print("生产数据完成")
connect.close()
消费者
import pika
auth = pika.PlainCredentials('ant','123456')
connect = pika.BlockingConnection(pika.ConnectionParameters("ip", port=5672, virtual_host='/', credentials=auth))
channel = connect.channel()
# print(channel)
# 创建交换机, exchange_type类型fanout:发布订阅模式, direct:关键字模式, topic : 通配符模式
change = channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 创建队列
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
# 绑定队列 , routing_key 绑定关键字参数
channel.queue_bind(exchange='logs', queue=queue_name)
# 插入数据
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback)
channel.start_consuming()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。