赞
踩
Message Queue,是一种提供消息队列服务的中间件。提供了消息生产、存储、消费全过程API的软件系统。
JMS:Java Messaging Service。Java平台上有关MOM(Message Orientated Middleware)的技术规范。他便于Java应用程序的消息交换,提供标准的接口简化开发。ActiveMQ时典型实现
STOMP:Streaming Text Orientated Message Protocol。是一种MOM的简单文本协议。STOMP提供一个可互操作的连接格式,允许 客户端与任意STOMP消息代理进行交互。ActiveMQ时典型实现
AMQP:Advanced Message Queuing Protocol。一个提供统一消息服务的应用层标准,是应用层协议的一个开放标准。RabbitMQ是典型实现
MQTT:Message Queueing Telemetry Transport。IBM开发的一个即时通讯协议(二进制协议),主要用于服务器和低功耗IoT设备之间的通信
主题(Topic):表示一类消息的集合(可以理解为消息的类型),每个消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。一个生产者可以同时发送多种Topic消息,而一个消费者只能接收一种Topic消息
标签(Tag):用于快速过滤消息
1、在官网下载编译好的二进制压缩包,版本5.0.0即可,上传到Linux中
2、进行解压
3、配置环境变量ROCKETMQ_HOME和NAMESRV_ADDR
4、配置bin目录下的runserver.sh,根据实际情况修改JVM的内存参数
5、配置bin目录下的runbroker.sh,根据实际情况修改JVM的内存参数
6、执行nohup命令后台运行RocketMQ服务(nameserver必须先启动,broker需要再nameserver上注册)
# 启动nameserver nohup bin/mqnamesrv & # 启动broker nohup bin/mqbroker -c [confFile] & # -c可指定加载的配置文件,默认为conf/broker.conf # 查看日志rocketmq是否成功启动 tail nohup.out # 查看进程 jps # 停止broker sh bin/mqshutdown broker # 停止namesrv sh bin/mqshutdown namesrv
7、执行命令测试(rocketmq提供的测试样例,生产者会发送一千条消息)
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
8、执行命令测试(rocketmq提供的测试样例,消费者会接受一千条消息)
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
public void test_SyncProducer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("producer_group_name"); //设置注册服务的ip地址的端口 producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR); //启动生产者 producer.start(); for(int i=0; i<3; i++){ try { // 封装消息,设置topic,tag(用于消息快速过滤),消息数据 Message message = new Message( "TopicTest", "TagA", "ID04287777", ("Hello, RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); //同步发送消息并获取发送结果,producer从broker获取发送结果 SendResult sendResult = producer.send(message); System.out.println(sendResult); Thread.sleep(1500); } catch (Exception e) { throw new RuntimeException(e); } } producer.shutdown(); }
public void test_AsyncProducer() throws Exception{ DefaultMQProducer producer = new DefaultMQProducer(RocketMQConstant.PRODUCER_GROUP_NAME); producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 10; final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for(int i=0; i<messageCount; i++){ final int index = i; // 封装消息,设置topic,tag(用于消息快速过滤),消息数据 Message message = new Message( "TopicTest", "TagA", "ID04287777", ("Hello, RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 异步发送消息,若broker有响应会调用SendCallback中的方法 producer.send(message, new SendCallback() { public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.println(" Send Message "+ index +" OK: "+sendResult); } public void onException(Throwable throwable) { countDownLatch.countDown(); System.out.println(" Send Message "+ index +" Exception: "+throwable); } }); //单向发送 producer.sendOneway(message); System.out.println("Message "+index+" send done"); } //在100条消息发送完后关闭 countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); }
public void test_OneWayProducer() throws Exception{ DefaultMQProducer producer = new DefaultMQProducer(RocketMQConstant.PRODUCER_GROUP_NAME); producer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 10; final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for(int i=0; i<messageCount; i++){ final int index = i; // 封装消息,设置topic,tag(用于消息快速过滤),消息数据 Message message = new Message( "TopicTest", "TagA", "ID04287777", ("Hello, RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); //单向发送 producer.sendOneway(message); System.out.println("Message "+index+" send done"); } //在100条消息发送完后关闭 countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); }
public static void test_PushConsumer() throws Exception{ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name"); consumer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //消费者订阅的消息topic和tag(subExpression,*表示任意) consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { System.out.println("Receive New Message : "+list); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Start..."); }
不同于推模式消费者,拉模式下需要手动管理消息队列MessageQueue和偏移量offset的映射关系。但是最新的LitePullConsumer底层源码已经实现对mq和offset的管理,比较方便。
//拉模式消费者 public static void test_LitePullConsumer() throws Exception{ DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(RocketMQConstant.CONSUMER_GROUP_NAME); litePullConsumer.setNamesrvAddr(RocketMQConstant.NAME_SRV_ADDR); litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); litePullConsumer.subscribe("TopicTest", "*"); litePullConsumer.start(); try { while(true){ List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s%n", messageExts); } }finally { litePullConsumer.shutdown(); } }
RocketMQ传递对象,对象所属类需要实现序列化接口,并且将对象转换为字节数组存入消息体中。
保证消息的局部有序(其中几条消息的有序,不一定是全部消息都要有序),以防止受到网络传输的影响。
实现原理
生产者将一组有序的消息一次发到同一个MessageQueue中(依靠队列的特点保证局部有序性)。消费者消费完一个MessageQueue的消息后才会去消费下一个MessageQueue的消息。
public class OrderProducer { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME); try { producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR); producer.start(); for(int i=0; i<5; i++){ //用于指定顺序的id int orderId = i; for(int j=0; j<5; j++){ Message message = new Message( WanfengConstant.ORDER_TOPIC, "order_"+orderId, "KEY"+orderId, ("order_"+orderId+" step "+j).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //实现消息队列选择器对象,使同一个orderId的消息发送到同一个消息队列 SendResult sendResult = producer.send( message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId ); System.out.printf("%s%n", sendResult); } } }catch(Exception e){ e.printStackTrace(); producer.shutdown(); } } }
public class OrderConsumer { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(WanfengConstant.CONSUMER_GROUP_NAME); consumer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); try { consumer.subscribe(WanfengConstant.ORDER_TOPIC, "*"); //实现顺序消息监听者接口 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for(MessageExt messageExt : msgs){ System.out.println("Receive Message: " + new String(messageExt.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Start..."); } catch (Exception e) { e.printStackTrace(); consumer.shutdown(); } } }
生产者发送的消息推送给所有group的消费者
实现原理:将消费者设置MessageModel为广播模式。
public class BroadcastConsumer { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(WanfengConstant.CONSUMER_GROUP_NAME); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //设定消息模式为广播 consumer.setMessageModel(MessageModel.BROADCASTING); try { consumer.subscribe(WanfengConstant.ARCHIVE_TOPIC, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { msgs.forEach(messageExt -> { Archive archive = (Archive) WanfengObjectUtil.bytesToObject(messageExt.getBody()); System.out.println("Receive Message : "+archive.getId()); }); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Broadcast Consumer Start..."); }catch (Exception e){ e.printStackTrace(); consumer.shutdown(); } } }
若指定MessageModel为CLUSTERING,则生产者发送的消息会随机指定消费者消费。
顾名思义就是消息发送到broker时延迟指定的时间后再发送给消费者。常用于定时发送
过滤消息通过tag实现,在消费者端指定过滤的tag即可。
//消费者订阅tag1或tag2的消息
consumer.subscribe("TopicTest", "tag1 || tag2");
在RocketMQ中,消费者指定过滤条件后,将其上推到Broker中,在Broker中进行tag过滤,以减少网络IO,但同时也增加了Broker的繁忙。
public class TransactionProducer { public static void main(String[] args) { TransactionMQProducer producer = new TransactionMQProducer(WanfengConstant.PRODUCER_GROUP_NAME); TransactionListener transactionListener = new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("[WANFENG-INFO] TransactionProducer.executeLocalTransaction(): 执行成功..."); String tags = msg.getTags(); if (StringUtils.contains(tags, "TagA")) { //消息提交(发送出去) return LocalTransactionState.COMMIT_MESSAGE; } else if (StringUtils.contains(tags, "TagB")) { //消息回滚(丢掉消息) return LocalTransactionState.ROLLBACK_MESSAGE; } else { return LocalTransactionState.UNKNOW; } } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("[WANFENG-INFO] TransactionProducer.checkLocalTransaction(): 执行成功..."); String tags = msg.getTags(); if (StringUtils.contains(tags, "TagC")) { return LocalTransactionState.COMMIT_MESSAGE; } else { return LocalTransactionState.UNKNOW; } } }; ExecutorService executorService = new ThreadPoolExecutor( 2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3) ); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); try { producer.start(); } catch (Exception e) { e.printStackTrace(); } String[] tags = new String[]{"TagA", "TagB", "TagC"}; CountDownLatch countDownLatch = new CountDownLatch(9); for (int i = 0; i < 9; i++) { try { Message message = new Message("TopicTest", tags[i % tags.length], "Key" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(message, null); System.out.println(sendResult); Thread.sleep(1000); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } try { countDownLatch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { try { Thread.sleep(100000); } catch (InterruptedException e) { throw new RuntimeException(e); } producer.shutdown(); } } }
ACL对用户对Topic资源的访问权限进行控制
在pom依赖中引入acl的依赖包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>5.0.0</version>
</dependency>
在服务端的conf/broker.conf文件,添加配置,开启acl
aclEnable=true
在服务端的conf/plain_acl.yml文件,配置具体权限规则(热加载,不需要重启mq)
accounts: - accessKey: RocketMQ #用户名 secretKey: 12345678 #密码 whiteRemoteAddress: #访问地址白名单 admin: false #是否为管理员(管理员可以访问所有Topic) defaultTopicPerm: DENY #默认Topic访问权限 defaultGroupPerm: SUB #默认组权限 topicPerms: #Topic对应的权限,若这里找不到则采用defaultTopicPerm - topicA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB
在创建生产者对象时需加入RPCHook(acl的用户信息)
public class AclProducer { private static final String ACL_ACCESS_KEY = "RocketMQ"; private static final String ACL_SECRET_KEY = "12345678"; /** * 通过用户名和密码获取RPCHook * @return */ public static RPCHook getAclRPCHook(){ return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY)); } public static void main(String[] args) throws MQClientException, InterruptedException { //创建生产者时加入用户信息,即RPCHook DefaultMQProducer producer = new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME, getAclRPCHook()); producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR); producer.start(); for (int i = 0; i < 20; i++) { try { Message message = new Message( "TopicTest", WanfengConstant.TAGS_NAME, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /*消息体转换成二进制数组*/ ); SendResult sendResult = producer.send(message); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } } }
Producer,Consumer,Broker处理消息的相关信息
消息轨迹的实现原理是MQ把消息轨迹都往RMQ_SYS_TRACE_TOPIC的Topic中放
在Broker端配置文件开启消息轨迹
traceTopicEnable=true
创建生产者时指定enableMsgTrace参数为true,开启消息轨迹。也可以指定customizedTraceTopic参数来自定义消息轨迹的Topic。
public class TraceProducer { public static void main(String[] args) throws MQClientException { //指定enableMsgTrace参数为true,开启消息轨迹 DefaultMQProducer producer = new DefaultMQProducer(WanfengConstant.PRODUCER_GROUP_NAME, true); producer.setNamesrvAddr(WanfengConstant.NAMESRV_ADDR); producer.start(); for (int i = 0; i < 20; i++) { try { Message message = new Message( "TopicTest", WanfengConstant.TAGS_NAME, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /*消息体转换成二进制数组*/ ); SendResult sendResult = producer.send(message); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); } } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。