赞
踩
写这篇文章的目的:由于被华为的大哥安排了我写关于:
本地搭建mq服务
用rocketmq
服务叫mq-server
服务主要实现对mq的操作,
每个操作提供对外的访问接口,
以后其他服务需要发mq消息,
都直接通过Fegin进行调用此服务的操作接口
===========================
提供消息的收发接口
别的服务想收发,就进行调用
尽量提供多类常用接口
注释要做好,变量,方法名,判断,类作用
这样的需求,都是建立在SpringCloudAlibaba框架下完成的,使用便把学习RocketMQ的内容记录下来,方便复习和以后粘贴的使用。
MQ(Message Queue)是一种跨进程的通信机制,用于传递消息。通俗点说,就是一个先进先出的数据结构。
(1)、异步解耦
最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法如下:
此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续的注册短信和邮件不是即时需要关注的步骤。
所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返回用户结果,由消息队列 MQ 异步地进行这些操作。架构图如下:
异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦。主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时,由于使用了消息队列MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。
(2)、流量削峰
流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛。
在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入消息队列 MQ。
秒杀处理流程如下所述:
目前业界有很多MQ产品,比较出名的有下面这些:
ZeroMQ
号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,开发比较灵活,采用C语言实现,实际上只是一个socket库的重新封装,如果做为消息队列使用,需要开发大量的代码。
ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。
RabbitMQ
使用erlang语言开发,性能较好,适合于企业级的开发。但是不利于做二次开发和维护。
ActiveMQ
历史悠久的Apache开源项目。已经在很多产品中得到应用,实现了JMS1.1规范,可以和spring-jms轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。
RocketMQ
阿里巴巴的MQ中间件,由java语言开发,性能非常好,能够撑住双十一的大流量,而且使用起来很简单。
Kafka
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转。
接下来我们先在linux平台下安装一个RocketMQ的服务
(1)、下载RocketMQ
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
环境要求
Linux 64位操作系统
64bit JDK 1.8+
(2)、安装RocketMQ
1)、上传文件到Linux系统(本人使用的是VM虚拟机)
我上传到了
/usr/local/src/
解压、重命名的命令:
unzip rocketmq-all-4.4.0-bin-release.zip
mv rocketmq-all-4.4.0-bin-release rocketmq
2)、 解压到安装目录
切换到安装目录:
(3)、启动RocketMQ
注意: 启动前要先修改两个配置文件,不然会启动失败:
vi bin/runbroker.sh
vi bin/runserver.sh
启动NameServer:
nohup ./bin/mqnamesrv &
# 只要进程不报错,就应该是启动成功了,可以查看一下日志
tail -f /root/logs/rocketmqlogs/namesrv.log
启动Broker:
# 编辑bin/runbroker.sh 和 bin/runserver.sh文件,修改里面的
# JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
# 为JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
nohup bin/mqbroker -n localhost:9876 &
tail -f /root/logs/rocketmqlogs/broker.log
(4)、测试RocketMQ
测试消息发送命令:
export NAMESRV_ADDR=localhost:9876
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
测试消息接收命令:
export NAMESRV_ADDR=localhost:9876
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
(5)、关闭RocketMQ
bin/mqshutdown broker
bin/mqshutdown namesrv
如上图所示,整体可以分成4个角色,分别是:NameServer,Broker,Producer,Consumer。 Broker(邮递员) Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能 NameServer(邮局) 消息队列的协调者,Broker向它注册路由信息,同时Producer和Consumer向其获取路由信息 Producer(寄件人) 消息的生产者,需要从NameServer获取Broker信息,然后与Broker建立连接,向Broker发送消息 Consumer(收件人) 消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息 Topic(地区) 用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息 Message Queue(邮件) 为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个MessageQueue, 这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个Message Queue读取消息 Message Message 是消息的载体。 Producer Group 生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。 Consumer Group 消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。
下载:
#在git上下载下面的工程 rocketmq-console-1.0.0
https://github.com/apache/rocketmq-externals/releases
下载好后:
解压rocketmq-console文件夹:
修改配置文件:
# 修改配置文件 rocketmq-console\src\main\resources\application.properties
server.port=17777 #项目启动后的端口号
rocketmq.config.namesrvAddr=192.168.17.128:9876 #nameserv的地址,注意防火墙要开启 9876端口
打包jar包,并启动:
# 进入控制台项目,将工程打成jar包
mvn clean package -Dmaven.test.skip=true
# 启动控制台 ,我是上传到VM虚拟机的Linux系统里运行
java -jar target/rocketmq-console-ng-1.0.0.jar
访问控制台:
注意: 当出现在Linux启动完成后但无法访问时,可以推断出,你没用关防火墙:systemctl stop firewalld
接下来我们使用Java代码来演示消息的发送和接收
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
消息发送步骤:
创建消息生产者, 指定生产者所属的组名
指定Nameserver地址
启动生产者
创建消息对象,指定主题、标签和消息体
发送消息
关闭生产者
public class RocketMqSendMessageTest { /** * 发送消息 * @return */ public static void main(String[] args) throws Exception { // 1、创建消息生产者,并且设置生产组名 DefaultMQProducer producer = new DefaultMQProducer("myproducer-group"); // 2、为生产者设置NameServer的地址 producer.setNamesrvAddr("192.168.17.128:9876"); // 3、启动生产者 producer.start(); // 4、构建消息对象,主要是设置消息的主题 标签 内容 Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes()); // 5、发送消息 第二个参数代表超时时间 SendResult result = producer.send(message, 10000); System.out.println(result); // 6、关闭生产者 producer.shutdown(); } }
消息接收步骤:
创建消息消费者, 指定消费者所属的组名
指定Nameserver地址
指定消费者订阅的主题和标签
设置回调函数,编写处理消息的方法
启动消息消费者
public class RocketMqReceiveMessageTest { /** * 接收消息 * @return */ public static void main(String[] args) throws Exception { // 1、创建消费者,并且为其指定消费组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group"); // 2、为消费者设置NameServer的地址 consumer.setNamesrvAddr("192.168.17.128:9876"); // 3、指定消费者订阅的主题和标题 consumer.subscribe("myTopic", "*"); // 4、设置一个回调函数,并在函数中编写接收到消息之后的处理方法 consumer.registerMessageListener(new MessageListenerConcurrently() { // 处理获取到的消息 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { // 消费逻辑 System.out.println("Message===>" + list); // 返回消费成功的状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 5、启动消费者 consumer.start(); System.out.println("启动消费成功了"); } }
接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:
1、在 mq-server 中添加rocketmq的依赖(自建个maven工程)
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
2、添加配置
server.port=21919 spring.application.name=mq-service spring.cloud.nacos.discovery.namespace=public spring.cloud.nacos.discovery.password=cxlk spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848 spring.cloud.nacos.discovery.username=nacos # 编码格式 server.servlet.encoding.force=true server.servlet.encoding.charset=UTF-8 server.servlet.encoding.enabled=true server.servlet.context-path=/mq # rocketMQ # rocketMQ服务的地址 rocketmq.name-server=192.168.17.128:9876 # 生产者组 rocketmq.producer.group=shop-order
3、编写测试代码
编写个实体类测试一下:
@Data
public class Order {
private Integer uId;
private String userName;
private Integer pId;
private String pName;
private String pPrice;
private Integer numBer;
}
测试用例:
@RestController @Slf4j public class OrderController { @Resource private RocketMQTemplate rocketMQTemplate; @RequestMapping("/order/prod/{pid}") public Order order(@PathVariable("pid") Integer pid) { Order order = new Order(); order.setUId(1); order.setUserName("测试用户"); order.setPId(pid); order.setPName("测试数据"); order.setPPrice("测试数据"); order.setNumBer(1); rocketMQTemplate.convertAndSend("order-topic", order); return order; } }
1、修改mq-client模块配置
依赖配置同上
2、修改主类
@SpringBootApplication
@EnableDiscoveryClient
public class MqClient {
public static void main(String[] args) {
SpringApplication.run(MqClient.class, args);
}
}
3、修改配置文件
server.port=21818 spring.application.name=mq-service-client spring.cloud.nacos.discovery.namespace=public spring.cloud.nacos.discovery.password=cxlk spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848 spring.cloud.nacos.discovery.username=nacos # 编码格式 server.servlet.encoding.force=true server.servlet.encoding.charset=UTF-8 server.servlet.encoding.enabled=true server.servlet.context-path=/mq # rocketMQ # rocketMQ服务的地址 rocketmq.name-server=192.168.17.128:9876 # 生产者组 rocketmq.producer.group=shop-order
4、编写消息接收服务
建立同样的实体类:
@Slf4j @Service /** * consumerGroup-消费者组名 topic-要消费的主题 */ @RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic") public class SmsService implements RocketMQListener<Order> { /** * @Author: 一个爱运动的程序员 * @Description: 消费的逻辑 * @Date: 23:19 2021/11/24 * @Param: * @return: **/ @Override public void onMessage(Order order) { log.info("接收到了一个订单信息{},接下来就可以发送短信通知了", order); } }
5、启动服务,执行操作,查看后台输出:
在RocketMQ的控制台,同样可以观察得到:
RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。
添加个测试依赖:
<!-- 单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
可靠同步发送
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方
式。
此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
可靠异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送
方通过回调接口接收服务器响应,并对响应结果进行处理。
异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知
启动转码服务,转码完成后通知推送转码结果等。
单向发送
单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不
等待应答。
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
测试代码:
@RunWith(SpringRunner.class) @SpringBootTest(classes = MqServer.class) public class MessageTypeTest { @Resource private RocketMQTemplate rocketMQTemplate; @Resource private MultiTypeMqSend multiTypeMq; /** * @Author: 一个爱运动的程序员 * @Description: 同步消息 * @Date: 18:23 2021/11/25 * @param: * @return: **/ // @Test public void testSyncSend() { // 参数一:top // 参数二:消息体 // 参数三:超时时间 SendResult result = rocketMQTemplate.syncSend("test-topic-1:tag", "这是一条同步消息", 10000); System.out.println(result); } /** * @Author: 一个爱运动的程序员 * @Description: 异步消息 * @Date: 18:32 2021/11/25 * @param: * @return: **/ // @Test public void testAsyncSend() throws InterruptedException { // 参数一:top // 参数二:消息体 // 参数三:回调 rocketMQTemplate.asyncSend("test-topic-1", "这是一条异步消息", new SendCallback() { // 成功响应的回调 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } // 异常响应的回调 @Override public void onException(Throwable throwable) { System.out.println(throwable); } }); System.out.println("=============================="); Thread.sleep(300000000); } /** * @Author: 一个爱运动的程序员 * @Description: 单向消息 * @Date: 18:37 2021/11/25 * @param: * @return: **/ // @Test public void testOneWay() { for (int i = 0; i < 10; i++) { rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息"); } }
三种发送方式的对比:
顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。
可以测试一下,首先我们先观察一下,通过控制台:
运行10次单向发送:
@Test
public void testOneWay() {
for (int i = 0; i < 10; i++) {
rocketMQTemplate.sendOneWay("test-topic-1", "这是一条单向消息");
}
}
运行后:
可以发现是随机分布的,接下来便测试一下单向顺序发送消息,其实很简单,只需在sendOneWay方法加多个Orderly就可以啦,示例如下:
@Test
public void testOneWayOrderly() {
for (int i = 0; i < 10; i++) {
rocketMQTemplate.sendOneWayOrderly("test-topic-1", "这是一条单向消息", "xx");
}
}
观察:
而至于同步和异步也同样如此,下面便给你参考一下,我在项目中所写的业务接口吧(同步顺序发送消息接口):
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。
事务消息交互流程:
两个概念:
事务消息发送步骤:
事务消息回查步骤:
参考代码如下:
public class InformationTest { @Resource private RocketMQTemplate rocketMQTemplate; public void createOrderBefore(Order order) { String txId = UUID.randomUUID().toString(); // 发送半事务消息 rocketMQTemplate.sendMessageInTransaction( "tx_producer_group", "tx_topic", MessageBuilder.withPayload(order).setHeader("txId", txId).build(), order ); } @Transactional(rollbackFor=Exception.class) public void createOrder(String txId, Order order) { // 这里可以做一个保存订单的业务操作 // orderDao.save(order); TxLog txLog = new TxLog(); txLog.setTxId(txId); txLog.setDate(new Date()); // 记录事务日志到数据库,这里便都略写了,本地事务,同时成功或同时失败 // txLogDao.save(txLog); // 记录事务日志的作用:当进行消息回查的时候,在记录事务里查找有没有刚刚保存的订单,如果有说明保存成功,反正则保存失败 } }
@Service @RocketMQTransactionListener(txProducerGroup = "tx_producer_group") public class InformationTestListener implements RocketMQLocalTransactionListener { @Resource private InformationTest informationTest; /** * @Author: 一个爱运动的程序员 * @Description: 执行本地事务 **/ @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { String txId = (String)msg.getHeaders().get("txId"); try { // 本地事务 Order order = (Order) arg; informationTest.createOrder(txId, order); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } /** * @Author: 一个爱运动的程序员 * @Description: 消息回查 **/ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String txId = (String)msg.getHeaders().get("txId"); TxLog txLog = txLogDao.findByid(txId).get(); if (txLog != null) { // 本地事务(订单)成功了 return RocketMQLocalTransactionState.COMMIT; } else { return RocketMQLocalTransactionState.ROLLBACK; } return null; } }
@RocketMQMessageListener(
consumerGroup = "shop",//消费者分组
topic = "order-topic",//要消费的主题
consumeMode = ConsumeMode.CONCURRENTLY, //消费模式(指定是否顺序消费:CONCURRENTLY(同步,默认)、ORDERLY(顺序)):无序和有序
messageModel = MessageModel.CLUSTERING, //消息模式:广播和集群,默认是集群 )
public class SmsService implements RocketMQListener<Order> {}
RocketMQ支持两种消息模式:
大家如果有兴趣了解的话,可以去B站观看教学视频:
https://www.bilibili.com/video/BV1R7411774f?p=51
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。