赞
踩
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。RabbitMQ可以,多个程序同时使用RabbitMQ ,但是必须队列名称不一样。采用erlang语言,属于爱立信公司开发的。
术语(Jargon)
Ubuntu 上安装
rabbitmq-server
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server
# 在 rabbitmq 中添加用户
hj@hj:~$ sudo rabbitmqctl add_user username password
Creating user "hj" # 这为设置成功后的提示,同下
# 将用户设置为管理员(只有管理员才能远程登录)
hj@hj:~$ sudo rabbitmqctl set_user_tags username administrator
Setting tags for user "hj" to [administrator]
# 为用户设置读写权限
hj@hj:~$ sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
Setting permissions for user "username" in vhost "/"
Windows 上安装
pika
pip3 install -i http://pypi.douban.com/simple/ pika --trusted-host pypi.douban.com
RabbitMQ 是建立在 Erlang OTP 平台上,所有需要下载 Erlang 和 RabbitMQ,官网上下载安装 Erlang
和 RabbitMQ
将 Erlang 添加到系统环境变量中
新建一个 ERLANG_HOME,值为 ERlang 的安装路径(有些安装时会自动添加):
将 ERLANG_HOME 添加到 path 中(这里以 win10 平台为例,其他平台可能会不一样):
打开 CMD
以管理员身份证运行,输入 erl
检查 ERlang 是否安装成功:
C:\Windows\system32>erl
Eshell V10.3 (abort with ^G) # 版本
1> # 标识符
rabbitmq management
下面我们来使用 RabbitMQ
来实现一个简单的消息收发:
消息不能直接发送到队列,而是需要经过 exchange 转发器转发,只有与转发器绑定了的队列,才能收到消息。在这里我们假设不经过 exchange 转发:
import pika credentials = pika.PlainCredentials('username', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.21.xxx', credentials=credentials)) channel = connection.channel() # 建立 rabbit 协议通道 # 声明queue channel.queue_declare(queue='hello') # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. # 消息不能直接发送到队列,而是需要经过 exchange 转发器转发,只有与转发器绑定了的队列,才能收到消息 channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
首先需要输入上面第一章中已经注册的 rabbitmq
账户,然后再连接远程端。
其次再声明了一个队列 queue
,名称为 hello
,在这里 exchange 为空,发送的内容 body
必须是 bytes
类型。
接收端也必须指定队列名称:
import pika import time credentials = pika.PlainCredentials('username', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.21.128', credentials=credentials)) channel = connection.channel() # 建立 rabbit 协议通道 channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(20) print(" [x] msg process done %r" % body) channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
运行结果如下:
我们已经知道即使消费者死亡,消息(队列)也不会丢失(在禁用 no_ack=True的前提下,现在是 auto_ack=True)
但是如果 RabbitMQ
服务器停止,我们的任务一样会丢失,当 RabbitMQ
退出或奔溃时,将会忘记队列和消息,除非我们告诉它不要这样,那么我们就要将队列和消息标记为持久。
RabbitMQ
永远不会丢失我们的队列,需要设置 durable=True
:# 发送端,即消息制造者
channel.queue_declare(queue='task_queue', durable=True)
# 发送端,即消息制造者
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent 使消息持久
)
设置好之后,发送端先发送一条消息,接收端先不要启动。使用以下命令关闭启动 rabbitmq
服务,观察队列和消息会不会真正丢失:
# 若命令运行失败,可以尝试使用 管理员模式 sudo
# 启动rabbitmq
service rabbitmq-server start
# 停止rabbitmq
service rabbitmq-server stop
# 重启rabbitmq
service rabbitmq-server restart
# 查看当前活动的队列
rabbitmqctl list_queues
所谓公平分发即一个生产者,多个消费者,类似于负载均衡。
下面我将设置一个发送端,两个接收端:
import pika import time import sys credentials = pika.PlainCredentials('username', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.21.128', credentials=credentials)) channel = connection.channel() # 建立 rabbit 协议通道 # 声明queue channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time() channel.basic_publish(exchange='', routing_key='task_queue', body=bytes(message, encoding='utf-8'), properties=pika.BasicProperties( delivery_mode=2, # make message persistent 使消息持久 ) ) print(" [x] Sent %r" % message) connection.close()
import pika import time credentials = pika.PlainCredentials('username', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.21.128', credentials=credentials)) channel = connection.channel() # 建立 rabbit 协议通道 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # b'Hello World! 1557373639.5839057' time.sleep(20) print(" [x] Done") print("method.delivery_tag", method.delivery_tag) # 1 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(on_message_callback=callback, queue='task_queue') print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
另外一个接收端代码一致,在此省略,运行结果如下:
事实上服务器之间接收、处理消息的能力是不一样的,受网络、配置等因素影响,因此公平分发消息就会导致以下问题出现:
为此我们可以在接收端设置 prefetch_count=1
,如果前面还有消息未处理,就告诉发送端不要给我发消息,直至处理完毕前一条消息为止:
channel.basic_qos(prefetch_count=1) # 如果前面有消息没处理完,就不要给我再发消息
上面的例子基本上都是一对一发送和接收消息,如果想要将消息发送到所有队列(queue)中,那么就需要用到广播了,而实现广播的一个重要参数就是 exchange
—— 消息转发器。
exchange 在定义时是有类型的,只有符合条件的才能接收消息,大致可分为以下几类:
routingKey
为关键字/组名routingKey
绑定的队列都可以接收消息所有绑定 exchange 的 queue 都能接收到消息。
应用场景:视频直播
import pika import sys credentials = pika.PlainCredentials('username', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.21.128', credentials=credentials)) channel = connection.channel() # 建立 rabbit 协议通道 # 指定 exchange 类型、名字 channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=bytes(message, encoding='utf-8')) print(" [x] Sent %r" % message) connection.close()
import pika credentials = pika.PlainCredentials('username', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.21.128', credentials=credentials)) channel = connection.channel() # 建立 rabbit 协议通道 channel.exchange_declare(exchange='logs', exchange_type='fanout') # 不指定queue名字, rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 # 最新源代码需要执行 queue,如果为 '',则 if empty string, the broker will create a unique queue name result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue # result = <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=amq.gen-hRrQ-pwaT9u-32CcIokCxA'])>"])> # queue_name = amq.gen-hRrQ-pwaT9u-32CcIokCxA channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(on_message_callback=callback, queue=queue_name) channel.start_consuming()
打开两个终端,分别运行:
python3 fanout_send.py t1
python3 fanout_send.py t2
运行结果如下:
RabbitMQ
还可以根据关键字发送接收消息,队列绑定关键字,发送端根据关键字发送到 exchange,exchange 再根据关键字判断发给哪个队列。
import pika import sys credentials = pika.PlainCredentials('username', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.21.128', credentials=credentials)) channel = connection.channel() # 建立 rabbit 协议通道 channel.exchange_declare(exchange='direct_logs', exchange_type='direct') # python3 direct_send.py info severity = sys.argv[1] if len(sys.argv) > 1 else 'info' # 严重程度,级别, info message = ' '.join(sys.argv[2:]) or 'Hello World!' # Hello World! channel.basic_publish(exchange='direct_logs', routing_key=severity, body=bytes(message, encoding='utf-8')) print(" [x] Sent %r:%r" % (severity, message)) # [x] Sent 'info' : 'Hello World!' connection.close()
import pika import sys credentials = pika.PlainCredentials('username', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.21.128', credentials=credentials)) channel = connection.channel() # 建立 rabbit 协议通道 channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue # python3 direct_recv.py info warning error # python3 direct_recv.py info # python3 direct_recv.py error severities = sys.argv[1:] # ['direct_recv.py', 'info', 'warning', 'error']、['direct_recv.py', 'error']、['direct_recv.py', 'info'] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) # 循环绑定关键字 for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(on_message_callback=callback, queue=queue_name) channel.start_consuming()
接收端执行打开三个终端,分别执行:
python3 direct_recv.py info warning error
python3 direct_recv.py info
python3 direct_recv.py error
然后循环关键字,绑定队列(queue),发送端执行相应关键字,接收端这边就能根据关键字接收消息。
运行结果如下:
import pika import sys credentials = pika.PlainCredentials('username', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.21.128', credentials=credentials)) channel = connection.channel() # 建立 rabbit 协议通道 channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=bytes(message, encoding='utf-8')) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
import pika import sys credentials = pika.PlainCredentials('username', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.21.128', credentials=credentials)) channel = connection.channel() # 建立 rabbit 协议通道 channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare('', exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(on_message_callback=callback, queue=queue_name) channel.start_consuming()
接收端开启四个终端,发送端开启一个:
# 接收端
python3 topic_recv.py *.django.* # 消息两端可以是任意,中间只要是 django 即可
python3 topic_recv.py # # 可以接收任意消息
python3 topic_recv.py mysql.* # 以 mysql 开头,结尾可以是任意
python3 topic_recv.py mysql.error.* # mysql.error 开头,结尾任意
# 发送端
python3 topic_send.py mysql.error.info
python3 topic_send.py ss.django.123
python3 topic_send.py mysql.error err happend
python3 topic_send.py python.error test
运行结果如下:
总结
#
号能匹配任意消息,相当于广播*
号也可以匹配任意,但是必须和其他一起使用上面收发消息都是单向的,即一个发一个接收,接收的不能够发送。而 RPC 是双向的,既能够发送也能接收。
应用场景:RPC 服务功能
import pika import uuid class FibonacciRpcClient(object): def __init__(self): credentials = pika.PlainCredentials('username', 'password') self.connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.21.128', credentials=credentials)) channel = self.connection.channel() # 建立 rabbit 协议通道 self.channel = self.connection.channel() result = self.channel.queue_declare('', exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(on_message_callback=self.on_response, queue=self.callback_queue, auto_ack=True) #准备接受命令结果 def on_response(self, ch, method, props, body): """"callback方法""" if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) #唯一标识符 self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) count = 0 while self.response is None: self.connection.process_data_events() #检查队列里有没有新消息,但不会阻塞 count += 1 print("check...", count) return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(5) print(" [.] Got %r" % response)
import pika import time credentials = pika.PlainCredentials('username', 'password') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.21.128', credentials=credentials)) channel = connection.channel() # 建立 rabbit 协议通道 channel.queue_declare(queue='rpc_queue') def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_message_callback=on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") channel.start_consuming()
运行结果如下:
#创建用户 rabbitmqctl add_user rabbitadmin 123456 rabbitmqctl set_user_tags rabbitadmin administrator # 给用户授权 rabbitmqctl set_permissions -p / rabbitadmin ".*" ".*" ".*" # 开启插件管理页面 rabbitmq-plugins enable rabbitmq_management rabbitmq-server start # 启动服务 rabbitmq-server stop # 关闭服务 rabbitmq-server restart # 重启服务 rabbitmq-server status # 查看服务状态 ps -ef|grep rabbitmq # 查看端口 rabbitmqctl list_queues # 查看队列消息 ./rabbitmqctl list_users # 查看用户列表命令 rabbitmqctl delete_user Username # 删除用户命令 whereis rabbitmq #查看rabbitmq安装目录
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。