当前位置:   article > 正文

RabbitMQ可靠性消息发送(java实现)_rabbitmq发送消息java实现

rabbitmq发送消息java实现

本博客属于 《RabbitMQ基础组件封装—整体结构》的子博客

一、整体架构

step1:消息落库,业务数据存库的同时,也要将消息记录存入数据库,二者要保证原子性

step2:Producer发送消息到MQ Broker;

step3:Producer收到 broker 返回的确认消息;

step4:更改消息记录库的状态(定义三种状态:0待确认、1已确认、2确认失败);

step5:定时任务获取长时间处于待确认状态的消息;

step6:Producer重试发送消息;

step7:重试次数超过3次,将消息状态更新为确认失败,后续根据具体业务再处理确认失败的消息;

二、消息记录的增删改查

1. 当前项目名为 rabbit-core-producer,为了实现消息记录入库,需要跟数据库打交道,这里首先添加依赖:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-jdbc</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.mybatis.spring.boot</groupId>
  7. <artifactId>mybatis-spring-boot-starter</artifactId>
  8. <version>1.1.1</version>
  9. </dependency>
  10. <dependency>
  11. <groupId>com.alibaba</groupId>
  12. <artifactId>druid</artifactId>
  13. <version>1.1.10</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>mysql</groupId>
  17. <artifactId>mysql-connector-java</artifactId>
  18. </dependency>

 2. 消息记录的建表语句 rabbit-producer-message-schema.sql

  1. -- 表 broker_message.broker_message 结构
  2. CREATE TABLE IF NOT EXISTS `broker_message` (
  3. `message_id` varchar(128) NOT NULL,
  4. `message` varchar(4000),
  5. `try_count` int(4) DEFAULT 0,
  6. `status` varchar(10) DEFAULT '',
  7. `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  8. `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  9. `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  10. PRIMARY KEY (`message_id`)
  11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

将 rabbit-producer-message-schema.sql 放在 rabbit-core-producer 项目下的 /src/main/resources/rabbit-producer-message-schema.sql, rabbit-core-producer项目 在 RabbitMQ基础组件封装—整体结构 有具体说明(当前博客是 RabbitMQ基础组件封装—整体结构 的其中一个章节)。

3. 数据源的配置文件 rabbit-producer-message.properties

  1. rabbit.producer.druid.type=com.alibaba.druid.pool.DruidDataSource
  2. rabbit.producer.druid.jdbc.url=jdbc:mysql://localhost:3306/broker_message?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT
  3. rabbit.producer.druid.jdbc.driver-class-name=com.mysql.jdbc.Driver
  4. rabbit.producer.druid.jdbc.username=root
  5. rabbit.producer.druid.jdbc.password=root
  6. rabbit.producer.druid.jdbc.initialSize=5
  7. rabbit.producer.druid.jdbc.minIdle=1
  8. rabbit.producer.druid.jdbc.maxActive=100
  9. rabbit.producer.druid.jdbc.maxWait=60000
  10. rabbit.producer.druid.jdbc.timeBetweenEvictionRunsMillis=60000
  11. rabbit.producer.druid.jdbc.minEvictableIdleTimeMillis=300000
  12. rabbit.producer.druid.jdbc.validationQuery=SELECT 1 FROM DUAL
  13. rabbit.producer.druid.jdbc.testWhileIdle=true
  14. rabbit.producer.druid.jdbc.testOnBorrow=false
  15. rabbit.producer.druid.jdbc.testOnReturn=false
  16. rabbit.producer.druid.jdbc.poolPreparedStatements=true
  17. rabbit.producer.druid.jdbc.maxPoolPreparedStatementPerConnectionSize= 20
  18. rabbit.producer.druid.jdbc.filters=stat,wall,log4j
  19. rabbit.producer.druid.jdbc.useGlobalDataSourceStat=true

同样需要将该文件放在 rabbit-core-producer 项目下的 /src/main/resources/rabbit-producer-message.properties。

因为上面配置中有用到数据库 broker_message,所以需要自己提前建好一个数据库 broker_message。

4. BrokerMessage.java

  1. public class BrokerMessage implements Serializable {
  2. private static final long serialVersionUID = 7447792462810110841L;
  3. private String messageId;
  4. private Message message;
  5. private Integer tryCount = 0;
  6. private String status;
  7. private Date nextRetry;
  8. private Date createTime;
  9. private Date updateTime;
  10. // getter、setter方法省略
  11. }

5. BrokerMessageMapper.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3. <mapper namespace="com.didiok.rabbit.producer.mapper.BrokerMessageMapper" >
  4. <resultMap id="BaseResultMap" type="com.didiok.rabbit.producer.entity.BrokerMessage" >
  5. <id column="message_id" property="messageId" jdbcType="VARCHAR" />
  6. <result column="message" property="message" jdbcType="VARCHAR" typeHandler="com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler" />
  7. <result column="try_count" property="tryCount" jdbcType="INTEGER" />
  8. <result column="status" property="status" jdbcType="VARCHAR" />
  9. <result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" />
  10. <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
  11. <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
  12. </resultMap>
  13. <sql id="Base_Column_List" >
  14. message_id, message, try_count, status, next_retry, create_time, update_time
  15. </sql>
  16. <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
  17. select
  18. <include refid="Base_Column_List" />
  19. from broker_message
  20. where message_id = #{messageId,jdbcType=VARCHAR}
  21. </select>
  22. <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
  23. delete from broker_message
  24. where message_id = #{messageId,jdbcType=VARCHAR}
  25. </delete>
  26. <insert id="insert" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
  27. insert into broker_message (message_id, message, try_count,
  28. status, next_retry, create_time,
  29. update_time)
  30. values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler}, #{tryCount,jdbcType=INTEGER},
  31. #{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP},
  32. #{updateTime,jdbcType=TIMESTAMP})
  33. </insert>
  34. <insert id="insertSelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
  35. insert into broker_message
  36. <trim prefix="(" suffix=")" suffixOverrides="," >
  37. <if test="messageId != null" >
  38. message_id,
  39. </if>
  40. <if test="message != null" >
  41. message,
  42. </if>
  43. <if test="tryCount != null" >
  44. try_count,
  45. </if>
  46. <if test="status != null" >
  47. status,
  48. </if>
  49. <if test="nextRetry != null" >
  50. next_retry,
  51. </if>
  52. <if test="createTime != null" >
  53. create_time,
  54. </if>
  55. <if test="updateTime != null" >
  56. update_time,
  57. </if>
  58. </trim>
  59. <trim prefix="values (" suffix=")" suffixOverrides="," >
  60. <if test="messageId != null" >
  61. #{messageId,jdbcType=VARCHAR},
  62. </if>
  63. <if test="message != null" >
  64. #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
  65. </if>
  66. <if test="tryCount != null" >
  67. #{tryCount,jdbcType=INTEGER},
  68. </if>
  69. <if test="status != null" >
  70. #{status,jdbcType=VARCHAR},
  71. </if>
  72. <if test="nextRetry != null" >
  73. #{nextRetry,jdbcType=TIMESTAMP},
  74. </if>
  75. <if test="createTime != null" >
  76. #{createTime,jdbcType=TIMESTAMP},
  77. </if>
  78. <if test="updateTime != null" >
  79. #{updateTime,jdbcType=TIMESTAMP},
  80. </if>
  81. </trim>
  82. </insert>
  83. <update id="updateByPrimaryKeySelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
  84. update broker_message
  85. <set >
  86. <if test="message != null" >
  87. message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
  88. </if>
  89. <if test="tryCount != null" >
  90. try_count = #{tryCount,jdbcType=INTEGER},
  91. </if>
  92. <if test="status != null" >
  93. status = #{status,jdbcType=VARCHAR},
  94. </if>
  95. <if test="nextRetry != null" >
  96. next_retry = #{nextRetry,jdbcType=TIMESTAMP},
  97. </if>
  98. <if test="createTime != null" >
  99. create_time = #{createTime,jdbcType=TIMESTAMP},
  100. </if>
  101. <if test="updateTime != null" >
  102. update_time = #{updateTime,jdbcType=TIMESTAMP},
  103. </if>
  104. </set>
  105. where message_id = #{messageId,jdbcType=VARCHAR}
  106. </update>
  107. <update id="updateByPrimaryKey" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
  108. update broker_message
  109. set message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
  110. try_count = #{tryCount,jdbcType=INTEGER},
  111. status = #{status,jdbcType=VARCHAR},
  112. next_retry = #{nextRetry,jdbcType=TIMESTAMP},
  113. create_time = #{createTime,jdbcType=TIMESTAMP},
  114. update_time = #{updateTime,jdbcType=TIMESTAMP}
  115. where message_id = #{messageId,jdbcType=VARCHAR}
  116. </update>
  117. <update id="changeBrokerMessageStatus" >
  118. update broker_message bm
  119. set bm.status = #{brokerMessageStatus,jdbcType=VARCHAR},
  120. bm.update_time = #{updateTime, jdbcType=TIMESTAMP}
  121. where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
  122. </update>
  123. <select id="queryBrokerMessageStatus4Timeout" resultMap="BaseResultMap" >
  124. <![CDATA[
  125. select message_id, message, try_count, status, next_retry, create_time, update_time
  126. from broker_message bm
  127. where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}
  128. and bm.next_retry < sysdate()
  129. ]]>
  130. </select>
  131. <select id="queryBrokerMessageStatus" resultMap="BaseResultMap" >
  132. select message_id, message, try_count, status, next_retry, create_time, update_time
  133. from broker_message bm
  134. where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}
  135. </select>
  136. <update id="update4TryCount" >
  137. update broker_message bm
  138. set bm.try_count = bm.try_count + 1,
  139. bm.update_time = #{updateTime,jdbcType=TIMESTAMP}
  140. where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
  141. </update>
  142. </mapper>

6. BrokerMessageMapper.java

  1. @Mapper
  2. public interface BrokerMessageMapper {
  3. int deleteByPrimaryKey(String messageId);
  4. int insert(BrokerMessage record);
  5. int insertSelective(BrokerMessage record);
  6. BrokerMessage selectByPrimaryKey(String messageId);
  7. int updateByPrimaryKeySelective(BrokerMessage record);
  8. int updateByPrimaryKeyWithBLOBs(BrokerMessage record);
  9. int updateByPrimaryKey(BrokerMessage record);
  10. void changeBrokerMessageStatus(@Param("brokerMessageId")String brokerMessageId, @Param("brokerMessageStatus")String brokerMessageStatus, @Param("updateTime")Date updateTime);
  11. List<BrokerMessage> queryBrokerMessageStatus4Timeout(@Param("brokerMessageStatus")String brokerMessageStatus);
  12. List<BrokerMessage> queryBrokerMessageStatus(@Param("brokerMessageStatus")String brokerMessageStatus);
  13. int update4TryCount(@Param("brokerMessageId")String brokerMessageId, @Param("updateTime")Date updateTime);
  14. }

7. MessageStoreService.java(这里不加接口类了,直接在MessageStoreService.java中写具体逻辑实现)

  1. @Service
  2. public class MessageStoreService {
  3. @Autowired
  4. private BrokerMessageMapper brokerMessageMapper;
  5. public int insert(BrokerMessage brokerMessage) {
  6. return this.brokerMessageMapper.insert(brokerMessage);
  7. }
  8. public BrokerMessage selectByMessageId(String messageId) {
  9. return this.brokerMessageMapper.selectByPrimaryKey(messageId);
  10. }
  11. public void succuess(String messageId) {
  12. this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
  13. BrokerMessageStatus.SEND_OK.getCode(),
  14. new Date());
  15. }
  16. public void failure(String messageId) {
  17. this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
  18. BrokerMessageStatus.SEND_FAIL.getCode(),
  19. new Date());
  20. }
  21. public List<BrokerMessage> fetchTimeOutMessage4Retry(BrokerMessageStatus brokerMessageStatus){
  22. return this.brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus.getCode());
  23. }
  24. public int updateTryCount(String brokerMessageId) {
  25. return this.brokerMessageMapper.update4TryCount(brokerMessageId, new Date());
  26. }
  27. }

三、整合数据源

1. 读取配置文件,生成数据源,RabbitProducerDataSourceConfiguration.java

  1. @Configuration
  2. @PropertySource({"classpath:rabbit-producer-message.properties"})
  3. public class RabbitProducerDataSourceConfiguration {
  4. private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RabbitProducerDataSourceConfiguration.class);
  5. @Value("${rabbit.producer.druid.type}")
  6. private Class<? extends DataSource> dataSourceType;
  7. @Bean(name = "rabbitProducerDataSource")
  8. @Primary
  9. // 以这个rabbit.producer.druid.jdbc为前缀的属性值都会注入到DataSource中
  10. @ConfigurationProperties(prefix = "rabbit.producer.druid.jdbc")
  11. public DataSource rabbitProducerDataSource() throws SQLException {
  12. DataSource rabbitProducerDataSource = DataSourceBuilder.create().type(dataSourceType).build();
  13. LOGGER.info("============= rabbitProducerDataSource : {} ================", rabbitProducerDataSource);
  14. return rabbitProducerDataSource;
  15. }
  16. public DataSourceProperties primaryDataSourceProperties(){
  17. return new DataSourceProperties();
  18. }
  19. public DataSource primaryDataSource(){
  20. return primaryDataSourceProperties().initializeDataSourceBuilder().build();
  21. }
  22. }

2. 执行指定的sql脚本 ,BrokerMessageConfiguration.java

  1. /**
  2. * $BrokerMessageConfiguration
  3. * 帮我执行SQL脚本
  4. * 帮我进行数据库表结构的创建
  5. *
  6. */
  7. @Configuration
  8. public class BrokerMessageConfiguration {
  9. @Autowired
  10. private DataSource rabbitProducerDataSource;
  11. /**
  12. * 加载 rabbit-producer-message-schema.sql 脚本(这是一个建表语句)
  13. */
  14. @Value("classpath:rabbit-producer-message-schema.sql")
  15. private Resource schemaScript;
  16. @Bean
  17. public DataSourceInitializer initDataSourceInitializer() {
  18. System.err.println("--------------rabbitProducerDataSource-----------:" + rabbitProducerDataSource);
  19. final DataSourceInitializer initializer = new DataSourceInitializer();
  20. // 设置之前生成的数据源
  21. initializer.setDataSource(rabbitProducerDataSource);
  22. // 执行指定的sql脚本
  23. initializer.setDatabasePopulator(databasePopulator());
  24. return initializer;
  25. }
  26. /**
  27. * 执行指定的sql脚本
  28. * @return
  29. */
  30. private DatabasePopulator databasePopulator() {
  31. final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
  32. populator.addScript(schemaScript);
  33. return populator;
  34. }
  35. }

3. 接下来是和 Mybatis 配置相关的文件:RabbitProducerMyBatisConfiguration.java

  1. @Configuration
  2. // @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration执行完才能执行,即数据源生成之后才能执行当前类
  3. @AutoConfigureAfter(value = {RabbitProducerDataSourceConfiguration.class})
  4. public class RabbitProducerMyBatisConfiguration {
  5. @Resource(name= "rabbitProducerDataSource")
  6. private DataSource rabbitProducerDataSource;
  7. @Bean(name="rabbitProducerSqlSessionFactory")
  8. public SqlSessionFactory rabbitProducerSqlSessionFactory(DataSource rabbitProducerDataSource) {
  9. SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
  10. bean.setDataSource(rabbitProducerDataSource);
  11. ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
  12. try {
  13. // mapper.xml文件加载,这些配置本可以写在 application.yml 中,但是由于要作为一个基础组件,所以写在代码里,跟业务层面解绑,让业务层面无感知
  14. bean.setMapperLocations(resolver.getResources("classpath:com/didiok/rabbit/producer/mapping/*.xml"));
  15. SqlSessionFactory sqlSessionFactory = bean.getObject();
  16. sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);
  17. return sqlSessionFactory;
  18. } catch (Exception e) {
  19. throw new RuntimeException(e);
  20. }
  21. }
  22. @Bean(name="rabbitProducerSqlSessionTemplate")
  23. public SqlSessionTemplate rabbitProducerSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
  24. return new SqlSessionTemplate(sqlSessionFactory);
  25. }
  26. }

4. Mapper扫描配置相关的文件: RabbitProducerMybatisMapperScanerConfig.java

  1. @Configuration
  2. // @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration执行完才能执行,即数据源生成之后才能执行当前类
  3. @AutoConfigureAfter(RabbitProducerDataSourceConfiguration.class)
  4. public class RabbitProducerMybatisMapperScanerConfig {
  5. @Bean(name="rabbitProducerMapperScannerConfigurer")
  6. public MapperScannerConfigurer rabbitProducerMapperScannerConfigurer() {
  7. // mapper.java文件加载,这些配置本可以写在 application.yml 中,但是由于要作为一个基础组件,所以写在代码里,跟业务层面解绑,让业务层面无感知
  8. MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
  9. mapperScannerConfigurer.setSqlSessionFactoryBeanName("rabbitProducerSqlSessionFactory");
  10. mapperScannerConfigurer.setBasePackage("com.didiok.rabbit.producer.mapper");
  11. return mapperScannerConfigurer;
  12. }
  13. }

四、可靠性发送消息代码实现

  1. /**
  2. * $RabbitBrokerImpl 真正的发送不同类型的消息实现类
  3. *
  4. */
  5. @Slf4j
  6. @Component
  7. public class RabbitBrokerImpl implements RabbitBroker {
  8. @Autowired
  9. private RabbitTemplateContainer rabbitTemplateContainer;
  10. @Autowired
  11. private MessageStoreService messageStoreService;
  12. /**
  13. * 可靠性消息发送
  14. */
  15. @Override
  16. public void reliantSend(Message message) {
  17. message.setMessageType(MessageType.RELIANT);
  18. BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());
  19. if(bm == null) {
  20. //1. 把数据库的消息发送日志先记录好
  21. Date now = new Date();
  22. BrokerMessage brokerMessage = new BrokerMessage();
  23. brokerMessage.setMessageId(message.getMessageId());
  24. brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());
  25. //tryCount默认等于0 所以在最开始发送的时候不需要进行设置
  26. brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
  27. brokerMessage.setCreateTime(now);
  28. brokerMessage.setUpdateTime(now);
  29. brokerMessage.setMessage(message);
  30. messageStoreService.insert(brokerMessage);
  31. }
  32. //2. 执行真正的发送消息逻辑
  33. sendKernel(message);
  34. }
  35. @Override
  36. public void rapidSend(Message message) {
  37. // 省略...
  38. }
  39. /**
  40. * $sendKernel 发送消息的核心方法 使用异步线程池进行发送消息
  41. * @param message
  42. */
  43. private void sendKernel(Message message) {
  44. AsyncBaseQueue.submit((Runnable) () -> {
  45. CorrelationData correlationData =
  46. // 回调函数confirm中需要用到message.getMessageId(), message.getMessageType()。所以可以放在CorrelationData中
  47. new CorrelationData(String.format("%s#%s#%s",
  48. message.getMessageId(),
  49. System.currentTimeMillis(),
  50. message.getMessageType()));
  51. String topic = message.getTopic();
  52. String routingKey = message.getRoutingKey();
  53. RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
  54. rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
  55. log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageId: {}", message.getMessageId());
  56. });
  57. }
  58. @Override
  59. public void confirmSend(Message message) {
  60. // 省略...
  61. }
  62. @Override
  63. public void sendMessages() {
  64. // 省略...
  65. }
  66. }

并且在回调函数中,也要添加相应的逻辑:

  1. /**
  2. * $RabbitTemplateContainer池化封装
  3. * 每一个topic 对应一个RabbitTemplate
  4. * 1. 提高发送的效率
  5. * 2. 可以根据不同的需求制定化不同的RabbitTemplate, 比如每一个topic 都有自己的routingKey规则
  6. */
  7. @Slf4j
  8. @Component
  9. public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {
  10. private Map<String /* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap();
  11. private Splitter splitter = Splitter.on("#");
  12. private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;
  13. @Autowired
  14. private ConnectionFactory connectionFactory;
  15. @Autowired
  16. private MessageStoreService messageStoreService;
  17. public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {
  18. // 省略...
  19. }
  20. /**
  21. * 无论是 confirm 消息 还是 reliant 消息 ,发送消息以后 broker都会去回调confirm
  22. */
  23. @Override
  24. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  25. // 具体的消息应答
  26. List<String> strings = splitter.splitToList(correlationData.getId());
  27. String messageId = strings.get(0);
  28. long sendTime = Long.parseLong(strings.get(1));
  29. String messageType = strings.get(2);
  30. if(ack) {
  31. // 当Broker 返回ACK成功时, 就是更新一下日志表里对应的消息发送状态为 SEND_OK
  32. // 如果当前消息类型为reliant 我们就去数据库查找并进行更新
  33. if(MessageType.RELIANT.endsWith(messageType)) {
  34. this.messageStoreService.succuess(messageId);
  35. }
  36. log.info("send message is OK, confirm messageId: {}, sendTime: {}", messageId, sendTime);
  37. } else {
  38. log.error("send message is Fail, confirm messageId: {}, sendTime: {}", messageId, sendTime);
  39. }
  40. }
  41. }

上面大部分代码都是在实现迅速类型的消息发送时已经编写了,只是在 confirm()方法中添加了:

  1. // 如果当前消息类型为reliant 我们就去数据库查找并进行更新
  2. if(MessageType.RELIANT.endsWith(messageType)) {
  3. this.messageStoreService.succuess(messageId);
  4. }

五、定时任务获取长时间处于待确认状态的消息并重新发送

1. 实现分布式定时任务

这里的定时任务是使用 ElasticJob,并对其进行封装,封装在项目 rabbit-task中,封装成为了两个注解 @EnableElasticJob 和 @ElasticJobConfig 。

具体的 ElasticJob 的使用和封装过程可参考教程:ElasticJob使用与封装

2.  将封装好的项目 rabbit-task 添加到 当前项目中并使用

(1)引入 rabbit-task 的依赖

  1. <dependency>
  2. <groupId>com.bfxy.base.rabbit</groupId>
  3. <artifactId>rabbit-task</artifactId>
  4. <version>0.0.1-SNAPSHOT</version>
  5. </dependency>

(2)使用注解@EnableElasticJob

在当前项目 rabbit-core-producer 中的 自动装配类 中添加注解 @EnableElasticJob,使得当 应用程序启动的时候,就能对 ZooKeeper注册中心进行初始化,以及 ElasticJob的定时任务解析类 ElasticJobConfParser 的初始化。

  1. /**
  2. * $RabbitProducerAutoConfiguration 自动装配
  3. *
  4. */
  5. @EnableElasticJob
  6. @Configuration
  7. @ComponentScan({"com.didiok.rabbit.producer.*"})
  8. public class RabbitProducerAutoConfiguration {
  9. }

(3)实现定时任务的具体处理逻辑并在类上加注解@EnableElasticJob

这里为了消息的可靠性发送,我们需要抓取 超时却仍处于待确认状态 的消息,进行重新发送消息。这里使用 ElasticJob 的流式定时任务 DataFlowJob。

  1. @Component
  2. @ElasticJobConfig(
  3. name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",
  4. cron= "0/10 * * * * ?",
  5. description = "可靠性投递消息补偿任务",
  6. overwrite = true,
  7. shardingTotalCount = 1
  8. )
  9. @Slf4j
  10. public class RetryMessageDataflowJob implements DataflowJob<BrokerMessage>{
  11. @Autowired
  12. private MessageStoreService messageStoreService;
  13. @Autowired
  14. private RabbitBroker rabbitBroker;
  15. private static final int MAX_RETRY_COUNT = 3;
  16. @Override
  17. public List<BrokerMessage> fetchData(ShardingContext shardingContext) {
  18. // 抓取状态为未确认,而且 next_retry 小于当前时间的这些消息,为了确定百分百能发送成功,需要再进行重发
  19. List<BrokerMessage> list = messageStoreService.fetchTimeOutMessage4Retry(BrokerMessageStatus.SENDING);
  20. log.info("--------@@@@@ 抓取数据集合, 数量: {} @@@@@@-----------" , list.size());
  21. return list;
  22. }
  23. @Override
  24. public void processData(ShardingContext shardingContext, List<BrokerMessage> dataList) {
  25. dataList.forEach( brokerMessage -> {
  26. String messageId = brokerMessage.getMessageId();
  27. if(brokerMessage.getTryCount() >= MAX_RETRY_COUNT) {
  28. // 重试次数大于3,就不再进行重发了,直接认为发送失败,更改标记为失败
  29. this.messageStoreService.failure(messageId);
  30. log.warn(" -----消息设置为最终失败,消息ID: {} -------", messageId);
  31. } else {
  32. // 每次重发的时候要更新一下try_count和next_retry字段
  33. this.messageStoreService.updateTryCount(messageId);
  34. // 重发消息
  35. this.rabbitBroker.reliantSend(brokerMessage.getMessage());
  36. }
  37. });
  38. }
  39. }

上面的代码中加入了注解

@ElasticJobConfig(
      name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",
      cron= "0/10 * * * * ?",
      description = "可靠性投递消息补偿任务",
      overwrite = true,
      shardingTotalCount = 1
      )

则该类中的逻辑会定时执行。

对于重发消息的代码 this.rabbitBroker.reliantSend(brokerMessage.getMessage());,之前已经做过说明,这里不再赘述。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/384270
推荐阅读
相关标签
  

闽ICP备14008679号