赞
踩
消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。
消息队列的主要应用场景有:
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。它的运行原理示意图如下所示:
其中涉及的重要概念有:
有关RabbitMQ的Docker部署可见 : Spring Boot整合消息队列RabbitMQ
RocketMQ是一款分布式消息中间件,最初是由阿里巴巴消息中间件团队研发并大规模应用于生产系统,满足线上海量消息堆积的需求, 在2016年底捐赠给Apache开源基金会成为孵化项目,经过不到一年时间正式成为Apache顶级项目。
RocketMQ的架构图如下所示:
根据架构图可知,RocketMQ中包含如下的基本概念:
Producer:消息的生产者,它可以使用Broker Discovery通过NameServer来获取所有注册到NameServer中的Broker路由消息。当它需要发送消息到队列中时,便根据负载均衡策略将消息发送到Broker的一个或多个队列中
NameServer:注册中心,它是一个无状态节点,集群部署的各个节点之间无任何信息同步。它主要负责管理两部分信息:
Broker会将它的配置信息注册到NameServer中共生产者和消费者探测,并实时更新Topic信息到NameServer。生产者和消费者通过NameServer可以获取到Broker的地址信息,然后再决定如何发送消息或者消费消息
Broker:负责接收并存放消息,同时提供Push/Pull接口将消息发送给Consumer消费。Broker为主从(master-slave)架构模式,一个master可以对应多个slave,但一个slave只能对应一个master。master和slave之间的对应关系通过相同的Broker Name来确定,不同的Broker ID来指定不同的角色,0表示slave,非0表示master。消费者可以选择从master或者slave中获取消息,进行消费。Broker同时提供消息查询的功能,可以通过MessageID和MessageKey来查询消息。另外,Broker会与NameServer集群中所有的节点建立长连接,定时注册Topic消息到所有的节点中。NameServer会定时的扫描所有存活的Broker连接,如果超过2分钟NameServer没有扫描到连接,则NameServer就断开连接。但NameServer并不会主动的通知Consumer和Producer断开连接的消息,只能依靠它们下一次发送心跳时进行主动感知
Topic:主题,用于将消息按照设置的Topic进行划分。Producer将消息发往指定的Topic,Consumer订阅该Topic就可以收到这条消息,发送方可以同时往多个Topic投放消息,消费方也可以订阅多个Topic的消息。默认每个Topic自动创建时会设置4个Queue,需要顺序消费的消息发往同一队列
Message:消息,使用MessageId
唯一识别,用户在发送时可以设置messageKey,便于之后查询和跟踪。一个 Message 必须指定 Topic,Message 还可以设置Tag,以便消费端可以基于 Tag 进行过滤消息。也可以添加额外的键值对,方便在开发过程中诊断问题
Producer在消息队列运行过程中会与NameServer集群中中一个可用节点之间建立长连接,并每隔30s从NameServer中获取Topic路由信息。
Producer最多需要30s来获取哪些Broker不可用。
并根据路由信息已与相应的标识为mater的Broker之间建立长连接,且每隔30s向master发送心跳。Broker会每隔10s扫描Producer和Broker之间的连接,如果Broker超过两分钟没有收到Producer的心跳,就选择断开彼此之间的连接。
当Producer需要发送消息时,它会根据从NameServer中获取的Topic路由信息,以及负载均衡策略选择将其发送到哪一个Broker中,然后消息等待被消费。
当Producer确定了发送消息的Broker后,便通过建立的长连接将其发送到对应Topic的Broker中。Broker中包含有多个队列(Queue),最终消息会存储到Queue中。
Topic和Queue之间是一对多关系,主要是为了负载均衡,避免某一个Broker的负担过重。
根据负载均衡策略,一个Topic的消息可能会存放到多个Broker中,Broker中的单个队列也可能存放多个Topic的消息。
RocketMQ的消息的存储是由ConsumeQueue和CommitLog配合来完成的,ConsumeQueue中只存储很少的数据,消息主体都是通过CommitLog来进行读写。 如果某个消息只在CommitLog中有数据,而ConsumeQueue中没有,则消费者无法消费。
一旦消息存储成功,Consumer就可以使用从NameServer中获取的Topic信息与某个Broker建立长连接,默认情况下,消费者每隔30秒从NameServer获取所有Topic的最新队列情况。最后通过长连接从对应的Queue中拿到消息,进行消费。
Docker部署RocketMQ双主双从集群模式
mkdir -p /home/rocketmq/namesvr1/data && mkdir -p /home/rocketmq/namesvr1/log && mkdir -p /home/rocketmq/namesvr2/data && mkdir -p /home/rocketmq/namesvr2/log && mkdir -p /home/rocketmq/broker-m-1/data && mkdir -p /home/rocketmq/broker-m-1/log && mkdir -p /home/rocketmq/broker-m-1/conf && mkdir -p /home/rocketmq/broker-m-2/data && mkdir -p /home/rocketmq/broker-m-2/log && mkdir -p /home/rocketmq/broker-m-2/conf && mkdir -p /home/rocketmq/broker-s-1/data && mkdir -p /home/rocketmq/broker-s-1/log && mkdir -p /home/rocketmq/broker-s-1/conf && mkdir -p /home/rocketmq/broker-s-2/data && mkdir -p /home/rocketmq/broker-s-2/log && mkdir -p /home/rocketmq/broker-s-2/conf
broker-m-1.conf
#集群名称 brokerClusterName=DefaultCluster #broker名称 brokerName=broker1 #brokerId master用0 slave用其他 brokerId=0 #清理时机 deleteWhen=4 #文件保留时长 48小时 fileReservedTime=48 #broker角色 -ASYNC_MASTER异步复制 -SYNC_MASTER同步双写 -SLAVE brokerRole=SYNC_MASTER #刷盘策略 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #主机ip brokerIP1= xxxx #对外服务的监听接口,同一台机器上部署多个broker,端口号要不相同 listenPort=10911 #namesvr namesrvAddr=xxxx:9876;xxxx:9877 #是否能够自动创建topic autoCreateTopicEnable=true
broker-s-1.conf
#集群名称 brokerClusterName=DefaultCluster #broker名称 brokerName=broker1 #brokerId master用0 slave用其他 brokerId=1 #清理时机 deleteWhen=4 #文件保留时长 48小时 fileReservedTime=48 #broker角色 -ASYNC_MASTER异步复制 -SYNC_MASTER同步双写 -SLAVE brokerRole=SLAVE #刷盘策略 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #主机ip brokerIP1=xxxx #对外服务的监听接口,同一台机器上部署多个broker,端口号要不相同 listenPort=11911 #namesrv namesrvAddr=xxxx:9876;xxxx:9877 #是否能够自动创建topic autoCreateTopicEnable=true
broker-m-2.conf
#集群名称 brokerClusterName=DefaultCluster #broker名称 brokerName=broker2 #brokerId master用0 slave用其他 brokerId=0 #清理时机 deleteWhen=4 #文件保留时长 48小时 fileReservedTime=48 #broker角色 -ASYNC_MASTER异步复制 -SYNC_MASTER同步双写 -SLAVE brokerRole=SYNC_MASTER #刷盘策略 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #主机ip brokerIP1= xxxx #对外服务的监听接口,同一台机器上部署多个broker,端口号要不相同 listenPort=12911 #namesvr namesrvAddr=xxxx:9876;xxxx:9877 #是否能够自动创建topic autoCreateTopicEnable=true
broker-s-2.conf
#集群名称 brokerClusterName=DefaultCluster #broker名称 brokerName=broker2 #brokerId master用0 slave用其他 brokerId=1 #清理时机 deleteWhen=4 #文件保留时长 48小时 fileReservedTime=48 #broker角色 -ASYNC_MASTER异步复制 -SYNC_MASTER同步双写 -SLAVE brokerRole=SLAVE #刷盘策略 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #主机ip brokerIP1=xxxx #对外服务的监听接口,同一台机器上部署多个broker,端口号要不相同 listenPort=13911 #namesrv namesrvAddr=xxxx:9876;xxxx:9877 #是否能够自动创建topic autoCreateTopicEnable=true
version: '3.5' services: namesrv1: image: rocketmqinc/rocketmq:4.3.0 container_name: namesrv1 ports: - 9876:9876 volumes: - /home/rocketmq/namesrv1/log:/opt/logs - /home/rocketmq/namesrv1/data:/opt/store command: sh mqnamesrv networks: rocketmq: aliases: - namesrv1 namesrv2: image: rocketmqinc/rocketmq:4.3.0 container_name: namesrv2 ports: - 9877:9876 volumes: - /home/rocketmq/namesrv2/log:/opt/logs - /home/rocketmq/namesrv2/data:/opt/store command: sh mqnamesrv networks: rocketmq: aliases: - namesrv2 broker-m-1: image: rocketmqinc/rocketmq:4.3.0 container_name: broker-m-1 links: - namesrv1:namesrv1 - namesrv2:namesrv2 ports: - 10909:10909 - 10911:10911 - 10912:10912 environment: TZ: Asia/Shanghai NAMESRV_ADDR: "namesrv1:9876" JAVA_OPTS: "-Duser.home=/opt" JAVA_OPT_EXT: "-server -Xms64m -Xmx64m -Xmn64m" volumes: - /home/rocketmq/broker-m-1/log:/opt/logs - /home/rocketmq/broker-m-1/data:/opt/store - /home/rocketmq/broker-m-1/conf/broker-m-1.conf:/opt/rocketmq-4.3.0/conf/broker-m-1.conf command: sh mqbroker -c /opt/rocketmq-4.3.0/conf/broker-m-1.conf autoCreateTopicEnable=true & networks: rocketmq: aliases: - broker-m-1 broker-s-1: image: rocketmqinc/rocketmq:4.3.0 container_name: broker-s-1 links: - namesrv1:namesrv1 - namesrv2:namesrv2 ports: - 11909:10909 - 11911:11911 - 11912:10912 environment: TZ: Asia/Shanghai NAMESRV_ADDR: "namesrv1:9876" JAVA_OPTS: "-Duser.home=/opt" JAVA_OPT_EXT: "-server -Xms64m -Xmx64m -Xmn64m" volumes: - /home/rocketmq/broker-s-1/log:/opt/logs - /home/rocketmq/broker-s-1/data:/opt/store - /home/rocketmq/broker-s-1/conf/broker-s-1.conf:/opt/rocketmq-4.3.0/conf/broker-s-1.conf command: sh mqbroker -c /opt/rocketmq-4.3.0/conf/broker-s-1.conf autoCreateTopicEnable=true & networks: rocketmq: aliases: - broker-s-1 broker-m-2: image: rocketmqinc/rocketmq:4.3.0 container_name: broker-m-2 links: - namesrv1:namesrv1 - namesrv2:namesrv1 ports: - 12909:10909 - 12911:12911 - 12912:10912 environment: TZ: Asia/Shanghai NAMESRV_ADDR: "namesrv1:9876" JAVA_OPTS: "-Duser.home=/opt" JAVA_OPT_EXT: "-server -Xms64m -Xmx64m -Xmn64m" volumes: - /home/rocketmq/broker-m-2/log:/opt/logs - /home/rocketmq/broker-m-2/data:/opt/store - /home/rocketmq/broker-m-2/conf/broker-m-2.conf:/opt/rocketmq-4.3.0/conf/broker-m-2.conf command: sh mqbroker -c /opt/rocketmq-4.3.0/conf/broker-m-2.conf autoCreateTopicEnable=true & networks: rocketmq: aliases: - broker-m-2 broker-s-2: image: rocketmqinc/rocketmq:4.3.0 container_name: broker-s-2 links: - namesrv1:namesrv1 - namesrv2:namesrv2 ports: - 13909:10909 - 13911:13911 - 13912:10912 environment: TZ: Asia/Shanghai NAMESRV_ADDR: "namesrv1:9876" JAVA_OPTS: "-Duser.home=/opt" JAVA_OPT_EXT: "-server -Xms64m -Xmx64m -Xmn64m" volumes: - /home/rocketmq/broker-s-2/log:/opt/logs - /home/rocketmq/broker-s-2/data:/opt/store - /home/rocketmq/broker-s-2/conf/broker-s-2.conf:/opt/rocketmq-4.3.0/conf/broker-s-2.conf command: sh mqbroker -c /opt/rocketmq-4.3.0/conf/broker-s-2.conf autoCreateTopicEnable=true & networks: rocketmq: aliases: - broker-s-2 rocketmq-console: image: styletang/rocketmq-console-ng container_name: rocketmq-console ports: - 8090:8080 environment: JAVA_OPTS: -Drocketmq.namesrv.addr=namesrv1:9876;namesrv2:9877 -Dcom.rocketmq.sendMessageWithVIPChannel=false networks: rocketmq: aliases: - rocketmq-console networks: rocketmq: name: rocketmq driver: bridge
如果主机配置比较好,可以将JVM的相关参数设置的大一些。
docker-compose -f docker-compose.yml up -d
使用docker ps
查看已经启动的容器。
使用ip:8090
连接RocketMQ-console:
Rocketmq原理&最佳实践
十分钟入门RocketMQ
RocketMQ吐血总结
RocketMQ(1)-架构原理
Apache RocketMQ
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。