赞
踩
下载
二进制下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.8.0/
docker 拉取地址:docker pull foxiswho/rocketmq:server-4.3.2
安装
配置环境变量
找到安装包,启动nameserver
,进入 bin 目录双击 mqnamesrv.cmd
启动。启动 broker 命令 .\mqbroker -n 127.0.0.1:9876
,指定配置开启自动创建 topic 启动 bin/mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true -c conf/broker.conf &
可视化插件下载
下载地址:https://github.com/apache/rocketmq-externals.git
修改 rocketmq-externals\rocketmq-console\src\main\resources
下的 application.properties
,修改 修改port
和 namesrvAddr
,配置如下
server.contextPath= # 修改端口号 server.port=8088 #spring.application.index=true spring.application.name=rocketmq-console spring.http.encoding.charset=UTF-8 spring.http.encoding.enabled=true spring.http.encoding.force=true logging.config=classpath:logback.xml #if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876 # 添加 rocketmq.config.namesrvAddr rocketmq.config.namesrvAddr=127.0.0.1:9876 #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true rocketmq.config.isVIPChannel= #rocketmq-console's data path:dashboard/monitor rocketmq.config.dataPath=/tmp/rocketmq-console/data #set it false if you don't want use dashboard.default true rocketmq.config.enableDashBoardCollect=true
编译启动,用CMD进入\rocketmq-externals\rocketmq-console
文件夹,执行mvn clean package -Dmaven.test.skip=true
,编译生成。编译成功之后,cmd进入target
文件夹,执行java -jar rocketmq-console-ng-1.0.0.jar
,启动rocketmq-console-ng-1.0.0.jar
。浏览器中输入‘127.0.0.1:8088’,成功后即可查看。
启动 nameserv
start mqnamesrv.cmd
启动 broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
官网:https://github.com/apache/rocketmq-spring
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
compile 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.2.0'
配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: leyang
# 超时时间 5 分钟
send-message-timeout: 300000
# 重试次数 3 次
retry-times-when-send-failed: 3
consumer:
group: leyang
rocketMQTemplate.send("test-topic-1", new GenericMessage(demo));
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class TestConsumer implements RocketMQListener<Demo> {
@Override
public void onMessage(Demo demo) {
System.out.print("------- OrderPaidEventConsumer received:"+ JSON.toJSONString(demo));
}
}
同步发送 sync
发送消息采用同步模式,这种方式只有在消息完全发送完成之后才返回结果,此方式存在需要同步等待发送结果的时间代价。
这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。示例如下
public void sync() { rocketMQTemplate.syncSend("topic-name", "send sync message !"); }
- 1
- 2
- 3
- 4
异步发送sync
发送消息采用异步发送模式,消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送是成功或者失败。异步模式通常用于响应时间敏感业务场景,即承受不了同步发送消息时等待返回的耗时代价。
同同步发送一样,异步模式也在内部实现了重试机制,默认次数为2次(DefaultMQProducer#getRetryTimesWhenSendAsyncFailed})。发送的结果同样存在同一个消息可能被多次发送给给broker,需要应用的开发者自己在消费端处理幂等性问题。
public void async() { rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback() { @Override public void onSuccess(SendResult sendResult
- 1
- 2
- 3
- 4
- 5
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。