赞
踩
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,使用erlang语言编写,依赖Erlang环境运行。
Broker:运行消息队列服务进程的节点,包含Exchange、Queue;
Producer:消息生产者;
Consumer:消息消费者;
Server:AMQP实例,接收客户端的连接;
Connection:连接,应用程序与Server的TCP连接;
Channel:信道,轻量级的Connection,消息读写等操作在信道中进行。客户端在一个Connection中建立多个信道,每个信道代表一个会话任务。如果每一次访问 RabbitMQ 都建立一个 Connection,在大量消息时开销会变得巨大;
Message:应用程序和服务器之间传送的数据;
Virtual Host:逻辑隔离,由若干Exchange、Queue组成;
Exchange:交换器,按照路由规则将消息路由到一个或者多个队列;
Queue:队列,用来暂存消息,供consumer消费;
RoutingKey:路由键,producer向Exchange发送消息时会标识路由键,按照路由键的不通发送到指定Queue。格式如“com.rabbitmq”;
Binding:绑定,Echange、Queue之间的虚拟连接,由一个或者多个RoutingKey组成;
Direct Exchange:直连交换器,一对一发送,绑定一个Queue,与一个特定的RoutingKey完全匹配;
Fanout Exchange:扇形交换器,类似子网广播,绑定多个Queue,发送到该类型交换器的消息,都会被转发到与其绑定的所有Queue上;
Topic Exchange:主题交换器,通过通配符(*)(#)匹配RoutingKey,路由到相应的Queue;
Headers Exchange:头交换器,通过匹配请求头中携带的键值进行消息路由,有2种模式:全部匹配、部分匹配;
主从架构,吞吐量受限。
master queue:主队列,读写操作均在master queue上,然后由master同步操作到mirror queue。即使consumer连接到了mirror queue,相关操作也会被路由到master queue上。对于生成队列,原理和消费一样;
mirror queue:镜像队列,作为备份,在master queue所在节点挂掉之后,系统把mirror queue提升为master queue;
P发送消息到队列,C监听消息队列,如果Queue中有消息,就消费,然后自动删除。可以通过启用消息确认机制确保稳健性,消费完后会立即发送ack,否则会造成内存溢出;
P发送消息到队列,C1、C2共同监听Queue,争抢消息,谁先拿到谁就消费;
P发送消息到交换器X,X发布订阅,将消息发送到已绑定的Queue中,相应的C监听自己;
P发送消息到交换器X,X根据路由字符RoutingKey做匹配,发送到对应的Queue,对应的C进行消费;
路由模式的一种,X根据RoutingKey的进行模糊匹配,发送到相应的Queue,对应的C进行消费;
基于Direct交换器,使用MQ实现RPC的异步调用,过程如下:
1、Client即是P也是C,向rpc_queue送调用消息,同时监听reply_queue;
2、Server监听rpc_queue,收到消息后进行处理,返回结果;
3、Server将结果发送到reply_queue;
4、Client接收到RPC调用结果;
systemctl stop firewalld && systemctl disable firewalld
setenforce 0
sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config
vi /etc/hosts
ip1 rabbit1
ip2 rabbit2
ip3 rabbit3
yum install -y rabbitmq-server
mkdir -p /etc/systemd/system/rabbitmq-server.service.d
# 修改文件打开数
vi /etc/systemd/system/rabbitmq-server.service.d/limit.conf
[Service]
LimitNOFILE = 100000
# 修改ip与端口
vi /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODE_IP_ADDRESS="本机ip"
RABBITMQ_NODE_PORT=5672
vi /etc/rabbitmq/rabbitmq.config % Template Path: rabbitmq/templates/rabbitmq.config [ {kernel,[ {inet_dist_listen_min, 41055}, {inet_dist_listen_max, 41055}, # epmd端口必须打开,群集才能正常工作、cli才能使用,设置端口范围 {inet_default_connect_options, [{nodelay,true}]} ]}, {rabbit,[ # 在autoheal模式下,当某网络分区故障时,RabbitMQ会自动决定一个获胜分区winning,然后重启不在这个分区中的节点以恢复网络分区 {cluster_partition_handling, autoheal}, # tcp套接字的配置 {tcp_listen_options, [ binary, {packet, raw}, {reuseaddr, true}, # 未接受的TCP连接队列的最大数,达到此数量时,新的连接将被拒绝 {backlog, 4096}, {nodelay, true}, {exit_on_close, false}, # heartbeat: 0,代表关闭心跳服务 # 启用tcp keepalive机制 {keepalive, true} ]} {rabbitmq_management,[ {http_log_dir, "/tmp/rabbit-mgmt"}, # 监控各项资源的速率,none为不监控,默认为basic {rates_mode,none} ]}
# 复制cookie scp /var/lib/rabbitmq/.erlang.cookie root@rabbit2:/var/lib/rabbitmq/.erlang.cookie scp /var/lib/rabbitmq/.erlang.cookie root@rabbit3:/var/lib/rabbitmq/.erlang.cookie chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie chown rabbitmq:rabbitmq /etc/rabbitmq/rabbitmq.config chmod 400 /var/lib/rabbitmq/.erlang.cookie systemctl daemon-reload systemctl enable rabbitmq-server.service systemctl start rabbitmq-server.service # 查看状态 rabbitmqctl cluster_status # 关闭:其他从节点先执行关闭,然后加入集群主节点 rabbitmqctl stop_app rabbitmqctl join_cluster rabbit@rabbit1 rabbitmqctl start_app # 查看 rabbitmqctl cluster_status # 队列的镜像设置,在集群中所有的节点上进行镜像 rabbitmqctl set_policy ha-all '^(?!amq\.).*' '{"ha-mode": "all"} # 创建用户 rabbitmqctl add_user test rabbitmqctl set_permissions -p / test test#123 rabbitmqctl set_user_tags test 角色
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。