赞
踩
启动命令:
cd /usr/local/rocketmq/ # 1.先启动mqnamesrv #启动命令 nohup sh bin/mqnamesrv & # 或者加上输出日志 nohup sh bin/mqnamesrv > ~/logs/rocketmqlogs/namesrv.log & # 指定启动ip加上输出日志 nohup sh bin/mqnamesrv -n "172.18.2.97:9876" > ./logs/namesrv.log & # 停止命令 nohup sh bin/mqshutdown namesrv # 2.启动broker #startBroker.sh #注意,下面的ip地址以及端口请填写你安装的rocketmq机器的ip地址及端口。 nohup sh bin/mqbroker -c conf/broker.conf -n xxx.xx.xx.xx:9876 > ~/logs/rocketmqlogs/broker.log & # 查看日志 tail -f ~/logs/rocketmqlogs/broker.log #stopBroker.sh nohup sh bin/mqshutdown broker
ps :rocketmq在多网卡、虚拟网络、docker中,获取到错误ip,可以在启动是增加指定ip,后续在console中,可以查看获取到的ip.
指定broker内存大小:修改配置文件broker.conf
vi rocketmq-all-4.7.0-bin-release/bin/runbroker.sh
3. 端口说明:
rocke有9876
非vip通道端口:10911
vip通道端口:10909
10909是VIP通道对应的端口,在JAVA中的消费者对象或者是生产者对象中关闭VIP通道即可无需开放10909端口
如果是broker集群的话,还要开放10912,否则master的消息将无法复制到slave节点
RocketMQ-Console是RocketMQ项目的扩展插件,是一个图形化管理控制台,提供Broker集群状态查看,Topic管理,Producer、Consumer状态展示,消息查询等常用功能,这个功能在安装好RocketMQ后需要额外单独安装、运行。
下载地址:https://github.com/apache/rocketmq-externals
1、进入项目,修改配置文件application.properties中mq地址和端口
2、编译打包
$ mvn clean package -Dmaven.test.skip=true
启动
$ java -jar target/rocketmq-console-ng-1.0.0.jar
#如果配置文件没有填写Name Server的话,可以在启动项目时指定namesrvAddr
$ java -jar target/rocketmq-console-ng-1.0.0.jar --rocketmq.config.namesrvAddr='localhost:9876'
#因为本文在打包时配置了namesrvAddr,故而执行如下命令
$ java -jar target/rocketmq-console-ng-1.0.0.jar
访问http://127.0.0.1:8080/ 端口可在上述配置文件中修改
至此,rocketmq和console安装完毕。
消息发没发成功,默认情况下是3次重试。
通过设置retryTimesWhenSendFailed定义重试次数,通过设置sendMsgTimeout来定义超时时间
生产者 向消息队列里写入消息,不 同的业务场景需要生产者采用不同的写入策略 。 比如同步发送、异步发送、 延迟发送、 发送事务消息等。
消息的发送有同步和异步两种方式,上面的代码使用的是异步方式 。消息发送的返回状态有如下四种 : FLUSH_DISK_TIMEOUT 、 FLUSH_SLAVE_TIMEOUT 、SLAVE_NOT_AVAILABLE 、SEND_OK,不同状态在不同的刷盘策略和同步策略的配置下含义是不同的 。
FLUSH_DISK_TIMEOUT : 表示没有在规定时间内完成刷盘(需要 Broker 的刷盘策被设置成 SYNC_FLUSH 才会报这个错误) 。
FLUSH_SLAVE_TIMEOUT :表示在主备方式下,并且 Broker被设 置 成 SYNC_MASTER 方式,没有在设定时间内完成 主从同步 。
SLAVE_NOT_AVAILABLE : 这个状态 产生的场景和 FLUSH_SLAVE_TIMEOUT 类似, 表示在主备 方式下,并且 Broker被设置成 SYNC_MASTER,但是没有找到被配置成 Slave 的 Broker。
SEN_ OK :表示发送成功,发送成功的具体含义,比如消息是否已经 被存储到融盘?消息是否被同步到了 Slave上?消息在 Slave上是否被 写人磁盘?需要结合所配置的刷盘策略、主从策略来定 。 这个状态还可 以简单理解为,没有发生上面列出的 三个问题状态就是 SEND OK。
生产者发送的三种方式
同步方式
指消息在发送到mq后,等待接受mq响应结果,在收到响应后才会在发送下一个数据包。
应用场景
此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
调用DefaultMQProducer的send方法
try {
SendResult sendResult = producer.send(msg);
// 同步发送消息,只要不抛异常就是成功
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
异步方式
异步发送是指发送方发出数据后,不等接收方mq响应,接着发送下个数据包的通讯方式。当消息发送之后,不需要等待服务器响应即可返回,进行下一条消息发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
调用DefaultMQProducer的send方法需用户实现异步发送回调接口(SendCallback)
// 异步发送消息, 发送结果通过 callback 返回给客户端。 producer.sendAsync(msg, new SendCallback() { @Override public void onSuccess(final SendResult sendResult) { // 消息发送成功 System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId()); } @Override public void onException(OnExceptionContext context) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理 System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId()); } }); // 在 callback 返回之前即可取得 msgId System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());
单向发送
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
调用sendOneway(msg)方法
应用场景
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
// 由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式
producer.sendOneway(msg);
参考:https://help.aliyun.com/document_detail/29547.html
注:如果是异步发送,那么重试次数只有1次,对于同步而言,超时异常也是不会再去重试。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.7.0</version>
</dependency>
public class JmsConfig {
/**
* Name Server 地址,因为是集群部署 所以有多个用 分号 隔开
*/
public static final String NAME_SERVER = "172.18.2.97:9876";
/**
* 主题名称 主题一般是服务器设置好 而不能在代码里去新建topic( 如果没有创建好,生产者往该主题发送消息 会报找不到topic错误)
*/
public static final String TOPIC = "topic_family";
}
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.stereotype.Component; @Component public class Producer { private String producerGroup = "test_producer"; private DefaultMQProducer producer; public Producer(){ //示例生产者 producer = new DefaultMQProducer(producerGroup); //不开启vip通道 开通口端口会减2 producer.setVipChannelEnabled(false); //绑定name server producer.setNamesrvAddr(JmsConfig.NAME_SERVER); // 设置实例名称 producer.setInstanceName("quick_start_producer"); // 设置重试次数,默认2 producer.setRetryTimesWhenSendFailed(3); //设置发送超时时间,默认是3000 producer.setSendMsgTimeout(6000); // 开启生产者 start(); } /** * 对象在使用之前必须要调用一次,只能初始化一次 */ public void start(){ try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } public DefaultMQProducer getProducer(){ return this.producer; } /** * 一般在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown(){ this.producer.shutdown(); } }
import java.util.ArrayList; import java.util.List; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class SendMsgController { @Autowired private Producer producer; private List<String> mesList; /** * 初始化消息 */ public SendMsgController() { mesList = new ArrayList<String>(); mesList.add("11111"); mesList.add("22222"); mesList.add("33333"); mesList.add("44444"); mesList.add("55555"); } @RequestMapping("/text/rocketmq") public Object callback() throws Exception { //总共发送五次消息 for (String s : mesList) { //创建生产信息 Message message = new Message(JmsConfig.TOPIC, "testtag", ("get meg:" + s).getBytes()); //发送 SendResult sendResult = producer.getProducer().send(message); System.out.println("输出生产者信息={"+sendResult+"}"); } return "成功"; } }
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMqSendApp {
public static void main( String[] args )
{
System.out.println( "Hello World!" );
SpringApplication.run(RocketMqSendApp.class, args);
}
}
输出生产者信息={SendResult [sendStatus=SEND_OK, msgId=0A81CCE06D0073D16E937884A5FB0000, offsetMsgId=AC12026100002A9F00000000009486AA, messageQueue=MessageQueue [topic=topic_family, brokerName=broker-a, queueId=0], queueOffset=11]}
输出生产者信息={SendResult [sendStatus=SEND_OK, msgId=0A81CCE06D0073D16E937884A6240001, offsetMsgId=AC12026100002A9F000000000094875F, messageQueue=MessageQueue [topic=topic_family, brokerName=broker-a, queueId=1], queueOffset=10]}
输出生产者信息={SendResult [sendStatus=SEND_OK, msgId=0A81CCE06D0073D16E937884A62D0002, offsetMsgId=AC12026100002A9F0000000000948814, messageQueue=MessageQueue [topic=topic_family, brokerName=broker-a, queueId=2], queueOffset=13]}
输出生产者信息={SendResult [sendStatus=SEND_OK, msgId=0A81CCE06D0073D16E937884A6370003, offsetMsgId=AC12026100002A9F00000000009488C9, messageQueue=MessageQueue [topic=topic_family, brokerName=broker-a, queueId=3], queueOffset=11]}
输出生产者信息={SendResult [sendStatus=SEND_OK, msgId=0A81CCE06D0073D16E937884A6400004, offsetMsgId=AC12026100002A9F000000000094897E, messageQueue=MessageQueue [topic=topic_family, brokerName=broker-a, queueId=0], queueOffset=12]}
发送成功,该发送发送为同步发送。
总结:
同步发送:SendResult sendResult = producer.send(msg);
异步发送:producer.sendAsync(msg, new SendCallback() {});
单向发送: producer.sendOneway(msg);
*RocketMQ有两种消费模式:BROADCASTING广播模式,CLUSTERING集群模式,默认的是 集群消费模式。如果需要切换消费模式,需在消费者端进行如下设置。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setMessageModel(MessageModel.BROADCASTING);
消费端重试机制,当消费者消费消息后,需要给Broker返回消费状态。Consumer消费完成后需要返回ConsumeConcurrentlyStatus并返回消费状态。ConsumeConcurrentlyStatus是一个枚举,共有两种状态:
public enum ConsumeConcurrentlyStatus {
//消费成功
ConsumeConcurrentlyStatus,
//消费失败,一段时间后重试
RECONSUME_LATER;
}
注:只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息是不会重试的。
import java.io.UnsupportedEncodingException; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.springframework.stereotype.Component; @Component public class Consumer { /** * 消费者实体对象 */ private DefaultMQPushConsumer consumer; /** * 消费者组 */ public static final String CONSUMER_GROUP = "test_consumer"; /** * 通过构造函数 实例化对象 */ public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); //消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //订阅主题和 标签( * 代表所有标签)下信息 consumer.subscribe(JmsConfig.TOPIC, "*"); // 批量消费,每次拉取10条 consumer.setConsumeMessageBatchMaxSize(10); // //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { // msgs中只收集同一个topic,同一个tag,并且key相同的message // 会把不同的消息分别放置到不同的队列中 try { for (Message msg : msgs) { //消费者获取消息 这里只输出 不做后面逻辑处理 String body = new String(msg.getBody(), "utf-8"); System.out.println("Consumer-获取消息-主题topic为={"+msg.getTopic()+"}, 消费消息为={"+body+"}"); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("消费者 启动成功======="); } }
import java.io.UnsupportedEncodingException; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.springframework.stereotype.Component; @Component public class Consumer { /** * 消费者实体对象 */ private DefaultMQPushConsumer consumer; /** * 消费者组 */ public static final String CONSUMER_GROUP = "test_consumer"; /** * 通过构造函数 实例化对象 */ public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); //消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //订阅主题和 标签( * 代表所有标签)下信息 consumer.subscribe(JmsConfig.TOPIC, "*"); // 批量消费,每次拉取10条 consumer.setConsumeMessageBatchMaxSize(10); // //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { // msgs中只收集同一个topic,同一个tag,并且key相同的message // 会把不同的消息分别放置到不同的队列中 try { for (Message msg : msgs) { //消费者获取消息 这里只输出 不做后面逻辑处理 String body = new String(msg.getBody(), "utf-8"); System.out.println("Consumer-获取消息-主题topic为={"+msg.getTopic()+"}, 消费消息为={"+body+"}"); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("消费者 启动成功======="); } }
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketMqReceiveApp {
public static void main(String[] args) {
System.out.println( "Hello World!" );
SpringApplication.run(RocketMqReceiveApp.class, args);
}
}
server:
port: ${PORT:8081}
消费者 启动成功=======
Consumer-获取消息-主题topic为={topic_family}, 消费消息为={get meg:11111}
Consumer-获取消息-主题topic为={topic_family}, 消费消息为={get meg:55555}
Consumer-获取消息-主题topic为={topic_family}, 消费消息为={get meg:44444}
Consumer-获取消息-主题topic为={topic_family}, 消费消息为={get meg:22222}
Consumer-获取消息-主题topic为={topic_family}, 消费消息为={get meg:33333}
至此,rocket集成完成,现在回顾一下要点:
一. 生产者发送模式
1、同步模式
2、异步模式
3、单向模式
4、消息发送失败重试配置
5、消息发送超时时间配置
6、消息包大小配置
二.消费者接受消失
1、集群模式
2、广播模式
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。