赞
踩
RabbitMQ 官网:https://www.rabbitmq.com/
RabbitMQ 英文文档:https://www.rabbitmq.com/documentation.html
RabbitMQ 中文文档:http://rabbitmq.mr-ping.com/description.html
RabbitMQ 知识点总结合集:https://blog.csdn.net/CherryChenieth/article/details/124767155
RabbitMQ使用教程(超详细):https://blog.csdn.net/lzx1991610/article/details/102970854
RabbitMQ 消息队列的小伙伴: ProtoBuf、redis 等
这些组成部分是如何协同工作的呢,大概的流程如下:
消息队列的使用过程大概如下:
从工作流程可以看出,实际上有个关键的组件 Exchange,因为 消息发送到RabbitMQ后首先要经过Exchange路由才能找到对应的Queue。
实际上 Exchange 类型有四种,根据不同的类型工作的方式也有所不同。
AMQP协议 (Advanced Message Queuing Protocol (AMQP)) 是一个高级抽象层消息通信协议,RabbitMQ 是基于 AMQP 协议的实现。主要包括以下组件:虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)
Binding ( 绑定 )
Binding 联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由 Exchange Type决定。
Exchange ( 交换机 )
接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为。
也可以理解成具有路由表的路由程序。每个消息都有一个路由键(routing key),就是一个简单的字符串。交换机中有一系列的绑定(binding),即路由规则(routes)。交换机可以有多个。多个队列可以和同一个交换机绑定,同时多个交换机也可以和同一个队列绑定。(多对多的关系)。例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。
Virtual Host ( 虚拟主机 )
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host。一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
Channel ( 信道 )
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。
Command:AMQP 的命令。
客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。
AMQP 的通信
在 AMQP 中,Client 通过与 Broker 之间建立 Connection 来通信,而 Connection 建立在 Client 与 Virtual Host 之间。而在每个 Connection 上可以运行多个 Channel,每个 Channel 执行与Broker 的通信,Session 依附于 Channel 之上。Channel 是 Client 与 Broker 之间传输 Message 的实体。在通信的时候,会为每个 Command 分配一个唯一的标示符即 UUID,用于 Command 做校验和重传。Client 默认使用 guest/guest 访问权限和访问虚拟主机的根目录,这些默认项也是 RabbitMQ 的默认安装选项。
是 RabbitMQ 对外提供的 API中 最基本的对象
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果 RabbitMQ 没有收到回执并检测到消费者的 RabbitMQ 连接断开,则 RabbitMQ 会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout 概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ 连接断开。
这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑。
另外 pub message 是没有 ack 的。
如果希望在 RabbitMQ 服务重启的情况下也不会丢失消息,可以将 Queue 与 Message 都设置为可持久化的(durable),这样可以保证绝大部分情况下 RabbitMQ 消息不会丢失。但依然解决不了小概率丢失事件的发生( 比如 RabbitMQ 服务器已经接收到生产者的消息,但还没来得及持久化该消息时 RabbitMQ 服务器就断电了),如果需要对这种小概率事件也要管理起来,那么要用到事务。
持久化:Exchange、queue 有个标志 durable,直译叫做坚固的。durable 的唯一含义就是具有这个标志的 Exchange、queue 会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。那么如何才能做到不只是队列和交换机,还有消息都是持久的呢?
但是首先一个问题是,你真的需要消息是持久的吗?对于一个需要在重启之后回复的消息来说,它需要被写入到磁盘上,而即使是最简单的磁盘操作也是要消耗时间的。如果和消息的内容相比,你更看重的是消息处理的速度,那么不要使用持久化的消息。
当你将消息发布到交换机的时候,可以指定一个标志 "Delivery Mode"(投递模式)。根据你使用的 AMQP 的库不同,指定这个标志的方法可能不太一样。简单的说,就是将 Delivery Mode设置成 2 也就是持久。一般的 AMQP 库都是将 Delivery Mode 设置成 1 也就是非持久的。
所以要持久化消息的步骤如下:
1. 将交换机设成:durable
2. 将队列设成:durable
3. 将消息的:Delivery Mode 设置成2
绑定(Bindings)如何持久化?如果绑定了一个 durable 的 queue 和 一个 durable 的 Exchange,RabbitMQ 会自动保留这个绑定。类似的,如果删除了某个队列或交换机(无论是不是durable),依赖它的绑定都会自动删除。
注意两点:
前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
生产者在将消息发送给 Exchange 的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。
在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。
RabbitMQ为routing key设定的长度限制为255 bytes。
RabbitMQ 中通过 Binding 将 Exchange 与 Queue 关联起来,这样 RabbitMQ 就知道如何正确地将消息路由到指定的 Queue 了。
在绑定(Binding)Exchange 与 Queue 的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。
在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。
binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。
MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持 RPC。
RabbitMQ 中实现 RPC 的机制是:
官网安装步骤:https://www.rabbitmq.com/download.html
Ubuntu 安装 RabbitMQ:
sudo apt install rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl status rabbitmq-server
启用插件(启用后才能web访问): sudo rabbitmq-plugins enable rabbitmq_management
停止、启动、查看状态
sudo systemctl stop rabbitmq-server.service
sudo systemctl start rabbitmq-server.service
sudo systemctl status rabbitmq-server.service
浏览器访问 http://127.0.0.1:15672 默认账号:guest 密码:guest
rabbitmq支持各种插件,开启插件可以使用rabbitmq-plugins命令,插件的开启和关闭方法
rabbitmq-plugins <command> [<command options>]
Commands:
- list [-v] [-m] [-E] [-e] [<pattern>] 显示所有的的插件。
-v 显示版本
-m 显示名称
-E 显示明确已经开启的
-e 显示明确的和暗中开启的- enable <plugin> ... 开启一个插件
- disable <plugin> ... 关闭一个插件
rabbitmq_management 插件的用法:
web http://127.0.0.1:15672/
api http://127.0.0.1:15672/api/
cli http://127.0.0.1:15672/cli
点击 Admin 进入管理设置
选择Users进行用户设置
添加用户名、密码
选择角色
点击Add user保存用户
注:角色说明
administrator(超级管理员):可以登录控制台,查看所有信息可以对用户和策略进行操作
Monitoring(监控者):可以登录控制台,并查看节点相关信息,比如进程数,内存使用情况等
Policymaker(策略制定者):可以登录控制台,制定策略,但是无法查看节点信息
Management(普通管理员):仅能登录控制台
其他:无法登录控制台,一般指的是提供者和消费者
点击 Admin 进入管理设置
选择 virtual hosts 进行相应设置
输入 virtual host 名称 (virtual host 的名称一般以 "/" 开头)
点击 Add virtual host 保存
注:如果要给用户配置多个 virtual host,多次操作即可
添加用户:rabbitmqctl add_user test test
为用户设置角色:rabbitmqctl set_user_tags test administrator
添加virtual host:rabbitmqctl add_vhost /test
为用户设置virtual host:rabbitmqctl set_permissions -p /test test '.*' '.*' '.*'
注:其他常见命令( test为用户,/test 为 virtual host)
1、查看用户列表:rabbitmqctl list_users
2、查看用户权限:rabbitmqctl list_user_permissions test
3、查看哪个用户拥有virtual host权限:rabbitmqctl list_permissions -p /test
4、清除权限:rabbitmqctl clear_permissions -p /test test
5、删除用户:rabbitmqctl delete_user test
6、删除virtual host:rabbitmqctl delete_vhost /test
rabbitmq-plugins: plugin management:https://www.rabbitmq.com/rabbitmq-plugins.8.html
开启某个插件:rabbitmq-plugins enable xxx
关闭某个插件:rabbitmq-plugins disable xxx
查看 rabbitmq 已经安装的插件信息:/usr/rabbitmq/sbin/rabbitmq-plugins list
注意:开启某个插件后,重启服务器后生效。
rabbitmq_management 插件启动后可以在 Web页面 对 rabbitmq 进行管理
/usr/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management
/usr/rabbitmq/sbin/rabbitmq-plugins disable rabbitmq_management
- rabbitmqctl --help
-
- 用法: rabbitmqctl [--node <node>] [--timeout <timeout>]
- [--longnames] [--quiet] <command> [<command options>]
- 使用名 'rabbitmqctl help <command>' 了解指定命令的用法
-
- 可用命令:
- Help:
- help 显示帮助
- version 显示 CLI tools 版本
-
- Nodes(节点):
- await_startup Waits for the RabbitMQ application to start on the target node
- reset Instructs a RabbitMQ node to leave the cluster and eturn to its virgin state
- rotate_logs Instructs the RabbitMQ node to perform internal log rotation
- shutdown Stops RabbitMQ and its runtime (Erlang VM). Monitors progress for local nodes. Does not require a PID file path.
- start_app Starts the RabbitMQ application but leaves the runtime (Erlang VM) running
- stop Stops RabbitMQ and its runtime (Erlang VM). Requires a local node pid file path to monitor progress.
- stop_app Stops the RabbitMQ application, leaving the runtime (Erlang VM) running
- wait Waits for RabbitMQ node startup by monitoring a local PID file. See also 'rabbitmqctl await_online_nodes'
-
- Cluster(集群):
- await_online_nodes Waits for <count> nodes to join the cluster
- change_cluster_node_type Changes the type of the cluster node
- cluster_status Displays all the nodes in the cluster grouped by node type, together with the currently running nodes
- force_boot Forces node to start even if it cannot contact or rejoin any of its previously known peers
- force_reset Forcefully returns a RabbitMQ node to its virgin state
- forget_cluster_node Removes a node from the cluster
- join_cluster Instructs the node to become a member of the cluster that the specified node is in
- rename_cluster_node Renames cluster nodes in the local database
- update_cluster_nodes Instructs a cluster member node to sync the list of known cluster members from <seed_node>
-
- Replication(复制;回答;反响):
- cancel_sync_queue Instructs a synchronising mirrored queue to stop synchronising itself
- sync_queue Instructs a mirrored queue with unsynchronised mirrors (follower replicas) to synchronise them
-
- Users(用户):
-
- add_user Creates a new user in the internal database
- authenticate_user Attempts to authenticate a user. Exits with a non-zero code if authentication fails.
- change_password Changes the user password
- clear_password Clears (resets) password and disables password login for a user
- delete_user Removes a user from the internal database. Has no effect on users provided by external backends such as LDAP
- list_users List user names and tags
- set_user_tags Sets user tags
-
- Access Control(访问控制):
-
- clear_permissions Revokes user permissions for a vhost
- clear_topic_permissions Clears user topic permissions for a vhost or exchange
- list_permissions Lists user permissions in a virtual host
- list_topic_permissions Lists topic permissions in a virtual host
- list_user_permissions Lists permissions of a user across all virtual hosts
- list_user_topic_permissions Lists user topic permissions
- list_vhosts Lists virtual hosts
- set_permissions Sets user permissions for a vhost
- set_topic_permissions Sets user topic permissions for an exchange
-
- Monitoring, observability and health checks(监视、观察、运行状况检查):
- environment Displays the name and value of each variable in the application environment for each running application
- list_bindings Lists all bindings on a vhost
- list_channels Lists all channels in the node
- list_ciphers Lists cipher suites supported by encoding commands
- list_connections Lists AMQP 0.9.1 connections for the node
- list_consumers Lists all consumers for a vhost
- list_exchanges Lists exchanges
- list_hashes Lists hash functions supported by encoding commands
- list_queues Lists queues and their properties
- list_unresponsive_queues Tests queues to respond within timeout. Lists those which did not respond
- node_health_check Performs several opinionated health checks of the target node
- ping Checks that the node OS process is up, registered with EPMD and CLI tools can authenticate with it
- report Generate a server status report containing a concatenation of all server status information for support purposes
- schema_info Lists schema database tables and their properties
- status Displays status of a node
-
- Parameters(参数):
- clear_global_parameter Clears a global runtime parameter
- clear_parameter Clears a runtime parameter.
- list_global_parameters Lists global runtime parameters
- list_parameters Lists runtime parameters for a virtual host
- set_global_parameter Sets a runtime parameter.
- set_parameter Sets a runtime parameter.
-
- Policies(策略):
-
- clear_operator_policy Clears an operator policy
- clear_policy Clears (removes) a policy
- list_operator_policies Lists operator policy overrides for a virtual host
- list_policies Lists all policies in a virtual host
- set_operator_policy Sets an operator policy that overrides a subset of arguments in user policies
- set_policy Sets or updates a policy
-
- Virtual hosts:
-
- add_vhost Creates a virtual host
- clear_vhost_limits Clears virtual host limits
- delete_vhost Deletes a virtual host
- list_vhost_limits Displays configured virtual host limits
- restart_vhost Restarts a failed vhost data stores and queues
- set_vhost_limits Sets virtual host limits
- trace_off
- trace_on
-
- Node configuration:
-
- decode Decrypts an encrypted configuration value
- encode Encrypts a sensitive configuration value
- set_cluster_name Sets the cluster name
- set_disk_free_limit Sets the disk_free_limit setting
- set_log_level Sets log level in the running node
- set_vm_memory_high_watermark Sets the vm_memory_high_watermark setting
-
- Definitions:
-
- export_definitions Exports definitions in JSON or compressed Erlang Term Format.
- import_definitions Imports definitions in JSON or compressed Erlang Term Format.
-
- Feature flags:
-
- enable_feature_flag Enables a feature flag on target node
- list_feature_flags Lists feature flags
-
- Operations:
-
- close_all_connections Instructs the broker to close all connections for the specified vhost or entire RabbitMQ node
- close_connection Instructs the broker to close the connection associated with the Erlang process id
- eval Evaluates a snippet of Erlang code on the target node
- exec Evaluates a snippet of Elixir code on the CLI node
- force_gc Makes all Erlang processes on the target node perform/schedule a full sweep garbage collection
- hipe_compile Only exists for backwards compatibility. HiPE support has been dropped starting with Erlang 22. Do not use
-
- Queues:
-
- delete_queue Deletes a queue
- purge_queue Purges a queue (removes all messages in it)
- quorum_status Displays quorum status of a quorum queue

命令行详见:https://www.rabbitmq.com/manpages.html
基本的管理功能
stop [<pid_file>]
#停止在erlang node上运行的rabbitmq,会使rabbitmq停止
stop_app
#停止erlang node上的rabbitmq的应用,但是erlang node还是会继续运行的
start_app
#启动erlan node上的rabbitmq的应用
wait <pid_file>
#等待rabbitmq服务启动
reset
#初始化node状态,会从集群中删除该节点,从管理数据库中删除所有数据,例如vhosts等等。在初始化之前rabbitmq的应用必须先停止
force_reset
#无条件的初始化node状态
rotate_logs <suffix>
#轮转日志文件
cluster管理
join_cluster <clusternode> [--ram]
#clusternode表示node名称,--ram表示node以ram node加入集群中。默认node以disc node加入集群,在一个node加入cluster之前,必须先停止该node的rabbitmq应用,即先执行stop_app。
cluster_status
#显示cluster中的所有node
change_cluster_node_type disc | ram
#改变一个cluster中node的模式,该节点在转换前必须先停止,不能把一个集群中唯一的disk node转化为ram node
forget_cluster_node [--offline]
#远程移除cluster中的一个node,前提是该node必须处于offline状态,如果是online状态,则需要加--offline参数。
update_cluster_nodes clusternode
#
sync_queue queue
#同步镜像队列
cancel_sync_queue queue
用户管理
add_user <username> <password>
#在rabbitmq的内部数据库添加用户
delete_user <username>
#删除一个用户
change_password <username> <newpassword>
#改变用户密码 \\改变web管理登陆密码
clear_password <username>
#清除用户密码,禁止用户登录
set_user_tags <username> <tag> ...
#设置用户tags
list_users
#列出用户
add_vhost <vhostpath>
#创建一个vhosts
delete_vhost <vhostpath>
#删除一个vhosts
list_vhosts [<vhostinfoitem> ...]
#列出vhosts
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
#针对一个vhosts 给用户赋予相关权限
clear_permissions [-p <vhostpath>] <username>
#清除一个用户对vhosts的权限
list_permissions [-p <vhostpath>]
#列出哪些用户可以访问该vhosts
list_user_permissions <username>
#列出该用户的访问权限
set_parameter [-p <vhostpath>] <component_name> <name> <value>
#
clear_parameter [-p <vhostpath>] <component_name> <key>
#
list_parameters [-p <vhostpath>]
policy 管理
策略用来控制和修改queues和exchange在集群中的行为,策略可以应用到vhost
set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to>]
<name> <pattern> <definition>
#name 策略名称,pattern 正则表达式,用来匹配资源,符合的就会应用设置的策略,apply-to 表示策略应用到什么类型的地方,一般有queues、exchange和all,默认是all。priority 是个整数优先级,definition 是json格式设置的策略。
clear_policy [-p <vhostpath>] <name>
#清除一个策略
list_policies [-p <vhostpath>]
#列出已有的策略
queues && exchange状态信息
list_queues [-p <vhostpath>] [<queueinfoitem> ...]
#返回queue的信息,如果省略了-p参数,则默认显示的是"/"vhosts的信息。
list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
#返回exchange的信息。
list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
#返回绑定信息。
list_connections [<connectioninfoitem> ...]
#返回链接信息。
list_channels [<channelinfoitem> ...]
#返回目前所有的channels。
list_consumers [-p <vhostpath>]
#返回consumers,
status
#显示broker的状态
environment
#显示环境参数的信息
report
#返回一个服务状态report,
eval <expr>

RabbitMQ 服务
rabbitmq 启动:rabbitmq-server start
rabbitmq 关闭:rabbitmqctl stop
后台运行 rabbitmq:rabbitmq-server -detached
查看状态:rabbitmqctl -q status -q 表示安静模式。
rabbitmqctl list_queues 查看所有队列信息
rabbitmqctl stop_app 关闭 RabbitMQ 应用。若单机有多个实例,则在 rabbitmqctlh 后加 –n 指定名称
rabbitmqctl start_app 启动 RabbitMQ 应用。启动 和 关闭 配合使用,可以达到清空队列的目的。
用户管理
rabbitmqctl add_user username password 添加用户
rabbitmqctl delete_user username 删除用户
rabbitmqctl change_password username newpassword 修改密码
rabbitmqctl list_users 列出所有用户
rabbitmqctl set_user_tags {username} {tag ...} 设置用户角色Tag 可以为 administrator、monitoring、policymaker、management
示例:rabbitmqctl set_user_tags test_user monitoring policymaker 也可以给同一用户设置多个角色(1) 超级管理员(administrator)
可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。(2) 监控者(monitoring)
可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)(3) 策略制定者(policymaker)
可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息。
与administrator的对比,administrator能看到这些内容(4) 普通管理者(management)
仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。(5) 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
添加用户
# rabbitmqctl add_user testuser testpwd 添加用户 testuser
# rabbitmqctl set_permissions -p "/" testuser ".*" ".*" ".*" 为 testuser 添加权限
# rabbitmqctl delete_user testuser 删除 testuser 用户
权限设置
# rabbitmqctl set_permissions [-p vhostpath] {user} {conf_reg} {write_reg} {read_reg}
vhostpath:Vhost路径
user:用户名
conf_reg: 配置权限的正则。表示哪些配置资源能够被该用户访问。
write_reg:写权限的正则。表示哪些配置资源能够被该用户写入。
read_reg: 读权限的正则。表示哪些配置资源能够被该用户读取。
用户名为test的用户就可以访问vitrual host为test_host的资源了,并且具备读写的权限。
rabbitmqctl set_permissions -p test_host test "test-*" ".*" ".*"
用户名为weijh的用户就可以访问root默认虚拟主机所有的资源了,并且具备读写的权限。
rabbitmqctl set_permissions -p "/" weijh ".*" ".*" ".*"
虚拟主机管理
# rabbitmqctl add_vhost vhostpath 创建虚拟主机
# rabbitmqctl delete_vhost vhostpath 删除虚拟主机
# rabbitmqctl list_vhosts 列出所有虚拟主机
权限控制
# rabbitmqctl set_permissions [-p vhostpath] username regexp regexp regexp 设置用户权限
# rabbitmqctl clear_permissions [-p vhostpath] username 清除用户权限
# rabbitmqctl list_permissions [-p vhostpath] 列出虚拟主机上的所有权限
# rabbitmqctl list_user_permissions username 列出用户权限
查看 所有 ( queues、exchanges、bindings、connections )
# rabbitmqctl list_queues [-p <VHostPath>] [<QueueInfoItem> ...] 查看所有队列信息
# rabbitmqctl list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
# rabbitmqctl list_bindings [-p <VHostPath>]
# rabbitmqctl list_connections [<ConnectionInfoItem> ...]
# rabbitmqctl rotate_logs[suffix] 循环日志文件
# rabbitmqctl cluster clusternode… 集群管理
重置 RabbitMQ 节点
# rabbitmqctl reset
# rabbitmqctl force_reset
从它属于的任何集群中移除,从管理数据库中移除所有数据,例如配置过的用户和虚拟宿主, 删除所有持久化的消息。
force_reset 命令 和 reset 的区别是无条件重置节点,不管当前管理数据库状态以及集群的配置。
如果数据库或者集群配置发生错误才使用这个最后的手段。
注意:只有在停止RabbitMQ应用后,reset和force_reset才能成功。
重置 RabbitMQ 节点 步骤
(1)关闭rabbitmq: rabbitmqctl stop_app
(2)还原: rabbitmqctl reset
(3)启动: rabbitmqctl start_app
(4)添加用户: rabbitmqctl add_user root root
(5)设置权限:rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
(6)查看用户: rabbitmqctl list_users
修改 监听ip、监听端口
出于一些需要,比如我们有多个ip,我们希望rabbitmq仅运行在指定的ip上.
或者考虑到安全问题,我们希望修改一下rabbitmq的监听端口.
默认安装完成以后,在/etc下面会有一个rabbitmq的空目录,这时候我们需要手工创建rabbitmq.conf,并写入相关内容.
vi /etc/rabbitmq/rabbitmq.conf
RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
RABBITMQ_NODE_PORT=2222
保存以后重启服务就生效了.
rabbitmq.conf :https://www.rabbitmq.com/configure.html#config-file
一般情况下,RabbitMQ的默认配置就足够了。如果希望特殊设置的话,有两个途径:
1. 环境变量的配置文件 rabbitmq-env.conf 。
2. 配置信息的配置文件 rabbitmq.config;
注意:这两个文件默认是没有的,如果需要必须自己创建。
示例: rabbitmq-env.conf: 这个文件的位置是确定和不能改变的,位于:/etc/rabbitmq目录下(这个目录需要自己创建)。
#RABBITMQ_NODE_PORT= //端口号
#HOSTNAME=
RABBITMQ_NODENAME=mq
RABBITMQ_CONFIG_FILE= //配置文件的路径
RABBITMQ_MNESIA_BASE=/rabbitmq/data //需要使用的MNESIA数据库的路径
RABBITMQ_LOG_BASE=/rabbitmq/log //log的路径
RABBITMQ_PLUGINS_DIR=/rabbitmq/plugins //插件的路径
这是一个标准的erlang配置文件。它必须符合 erlang 配置文件的标准。它既有默认的目录,也可以在 rabbitmq-env.conf文件中配置。
文件的内容详见:https://www.rabbitmq.com/configure.html#config-items
rabbitmq 添加远程访问功能:http://blog.haohtml.com/archives/15249
Python 操作 RabbitMQ:https://www.cnblogs.com/phennry/p/5713274.html
- import pika
-
-
- def func_product():
- credentials = pika.PlainCredentials('test', 'test')
- parameters = pika.ConnectionParameters('192.168.16.53', 5672, 'test', credentials)
- # 创建一个链接对象,对象中绑定rabbitmq的IP地址
- connection = pika.BlockingConnection(parameters)
- # 创建一个 channel(频道)
- channel = connection.channel()
- # 通过 channel(频道) 来创建队列,如果MQ中队列存在忽略,没有则创建
- channel.queue_declare(
- queue='name1', # 指定队列名
- durable=True # 指定队列持久化
- )
-
- for i in range(100):
- channel.basic_publish(
- exchange='',
- routing_key='name1', # 指定队列名称
- body=f'Hello World_{i}'.encode('utf-8'),
- # 指定消息属性
- properties=pika.BasicProperties(
- delivery_mode=2, # 指定消息持久化
- )
- ) # 往队列中发送一个消息
-
- print("向队列发送 Hello World")
- connection.close() # 发送完关闭链接
-
-
- def func_consume():
- credentials = pika.PlainCredentials('test', 'test')
- parameters = pika.ConnectionParameters('192.168.16.53', 5672, 'test', credentials)
- # 创建一个链接对象,对象中绑定rabbitmq的IP地址
- connection = pika.BlockingConnection(parameters)
- # 创建一个 channel(频道)
- channel = connection.channel()
- # 通过 channel(频道) 来创建队列,如果MQ中队列存在忽略,没有则创建
- channel.queue_declare(queue='name1', durable=True)
-
- def callback_func(ch, method, properties, body): # callback函数负责接收队列里的消息
- print(f"接收消息 ---> {body}")
-
- channel.basic_consume(
- queue='name1', # 指定队列名
- on_message_callback=callback_func, # 从队列里去消息
- auto_ack=True
- )
- print('等待队列中消息')
- channel.start_consuming()
- pass
-
-
- if __name__ == '__main__':
- func_product()
- func_consume()
- pass

默认消息队列里的数据是按照顺序被消费者拿走的,例如:消费者1去队列中获取奇数序列任务,消费者2去队列中获取偶数序列的任务,消费者1处理的比较快而消费者2处理的比较慢,那么消费者1就会一直处于繁忙的状态,为了解决这个问题在需要加入下面代码:
channel.basic_qos(prefetch_count=1) :表示谁来获取,不再按照奇偶数 排列
:https://www.rabbitmq.com/getstarted.html
"简单的消息队列"、"发布订阅" 区别:
在RabbitMQ中,所有生产者提交的消息都由 Exchange 来接收,然后Exchange按照特定的策略转发到 Queue 进行存储,RabbitMQ 提供了四种 Exchange:fanout、direct、topic、header。由于header 模式在实际工作中用的比较少,下面主要对前三种进行比较。
exchange type = fanout :任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上
上面这张图,可以清晰的看到相互之间的关系,当我们设置成fanout模式时,
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。