赞
踩
本博客属于 《RabbitMQ基础组件封装—整体结构》的子博客
step1:消息落库,业务数据存库的同时,也要将消息记录存入数据库,二者要保证原子性;
step2:Producer发送消息到MQ Broker;
step3:Producer收到 broker 返回的确认消息;
step4:更改消息记录库的状态(定义三种状态:0待确认、1已确认、2确认失败);
step5:定时任务获取长时间处于待确认状态的消息;
step6:Producer重试发送消息;
step7:重试次数超过3次,将消息状态更新为确认失败,后续根据具体业务再处理确认失败的消息;
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-jdbc</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mybatis.spring.boot</groupId>
- <artifactId>mybatis-spring-boot-starter</artifactId>
- <version>1.1.1</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>druid</artifactId>
- <version>1.1.10</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- </dependency>
- -- 表 broker_message.broker_message 结构
- CREATE TABLE IF NOT EXISTS `broker_message` (
- `message_id` varchar(128) NOT NULL,
- `message` varchar(4000),
- `try_count` int(4) DEFAULT 0,
- `status` varchar(10) DEFAULT '',
- `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
- `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
- `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
- PRIMARY KEY (`message_id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
将 rabbit-producer-message-schema.sql 放在 rabbit-core-producer 项目下的 /src/main/resources/rabbit-producer-message-schema.sql, rabbit-core-producer项目 在 RabbitMQ基础组件封装—整体结构 有具体说明(当前博客是 RabbitMQ基础组件封装—整体结构 的其中一个章节)。
- rabbit.producer.druid.type=com.alibaba.druid.pool.DruidDataSource
- rabbit.producer.druid.jdbc.url=jdbc:mysql://localhost:3306/broker_message?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT
- rabbit.producer.druid.jdbc.driver-class-name=com.mysql.jdbc.Driver
- rabbit.producer.druid.jdbc.username=root
- rabbit.producer.druid.jdbc.password=root
- rabbit.producer.druid.jdbc.initialSize=5
- rabbit.producer.druid.jdbc.minIdle=1
- rabbit.producer.druid.jdbc.maxActive=100
- rabbit.producer.druid.jdbc.maxWait=60000
- rabbit.producer.druid.jdbc.timeBetweenEvictionRunsMillis=60000
- rabbit.producer.druid.jdbc.minEvictableIdleTimeMillis=300000
- rabbit.producer.druid.jdbc.validationQuery=SELECT 1 FROM DUAL
- rabbit.producer.druid.jdbc.testWhileIdle=true
- rabbit.producer.druid.jdbc.testOnBorrow=false
- rabbit.producer.druid.jdbc.testOnReturn=false
- rabbit.producer.druid.jdbc.poolPreparedStatements=true
- rabbit.producer.druid.jdbc.maxPoolPreparedStatementPerConnectionSize= 20
- rabbit.producer.druid.jdbc.filters=stat,wall,log4j
- rabbit.producer.druid.jdbc.useGlobalDataSourceStat=true
同样需要将该文件放在 rabbit-core-producer 项目下的 /src/main/resources/rabbit-producer-message.properties。
因为上面配置中有用到数据库 broker_message,所以需要自己提前建好一个数据库 broker_message。
- public class BrokerMessage implements Serializable {
-
- private static final long serialVersionUID = 7447792462810110841L;
-
- private String messageId;
-
- private Message message;
-
- private Integer tryCount = 0;
-
- private String status;
-
- private Date nextRetry;
-
- private Date createTime;
-
- private Date updateTime;
-
- // getter、setter方法省略
- }
- <?xml version="1.0" encoding="UTF-8" ?>
- <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
- <mapper namespace="com.didiok.rabbit.producer.mapper.BrokerMessageMapper" >
- <resultMap id="BaseResultMap" type="com.didiok.rabbit.producer.entity.BrokerMessage" >
- <id column="message_id" property="messageId" jdbcType="VARCHAR" />
- <result column="message" property="message" jdbcType="VARCHAR" typeHandler="com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler" />
- <result column="try_count" property="tryCount" jdbcType="INTEGER" />
- <result column="status" property="status" jdbcType="VARCHAR" />
- <result column="next_retry" property="nextRetry" jdbcType="TIMESTAMP" />
- <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
- <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
- </resultMap>
- <sql id="Base_Column_List" >
- message_id, message, try_count, status, next_retry, create_time, update_time
- </sql>
- <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
- select
- <include refid="Base_Column_List" />
- from broker_message
- where message_id = #{messageId,jdbcType=VARCHAR}
- </select>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
- delete from broker_message
- where message_id = #{messageId,jdbcType=VARCHAR}
- </delete>
- <insert id="insert" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
- insert into broker_message (message_id, message, try_count,
- status, next_retry, create_time,
- update_time)
- values (#{messageId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler}, #{tryCount,jdbcType=INTEGER},
- #{status,jdbcType=VARCHAR}, #{nextRetry,jdbcType=TIMESTAMP}, #{createTime,jdbcType=TIMESTAMP},
- #{updateTime,jdbcType=TIMESTAMP})
- </insert>
- <insert id="insertSelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
- insert into broker_message
- <trim prefix="(" suffix=")" suffixOverrides="," >
- <if test="messageId != null" >
- message_id,
- </if>
- <if test="message != null" >
- message,
- </if>
- <if test="tryCount != null" >
- try_count,
- </if>
- <if test="status != null" >
- status,
- </if>
- <if test="nextRetry != null" >
- next_retry,
- </if>
- <if test="createTime != null" >
- create_time,
- </if>
- <if test="updateTime != null" >
- update_time,
- </if>
- </trim>
- <trim prefix="values (" suffix=")" suffixOverrides="," >
- <if test="messageId != null" >
- #{messageId,jdbcType=VARCHAR},
- </if>
- <if test="message != null" >
- #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
- </if>
- <if test="tryCount != null" >
- #{tryCount,jdbcType=INTEGER},
- </if>
- <if test="status != null" >
- #{status,jdbcType=VARCHAR},
- </if>
- <if test="nextRetry != null" >
- #{nextRetry,jdbcType=TIMESTAMP},
- </if>
- <if test="createTime != null" >
- #{createTime,jdbcType=TIMESTAMP},
- </if>
- <if test="updateTime != null" >
- #{updateTime,jdbcType=TIMESTAMP},
- </if>
- </trim>
- </insert>
- <update id="updateByPrimaryKeySelective" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
- update broker_message
- <set >
- <if test="message != null" >
- message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
- </if>
- <if test="tryCount != null" >
- try_count = #{tryCount,jdbcType=INTEGER},
- </if>
- <if test="status != null" >
- status = #{status,jdbcType=VARCHAR},
- </if>
- <if test="nextRetry != null" >
- next_retry = #{nextRetry,jdbcType=TIMESTAMP},
- </if>
- <if test="createTime != null" >
- create_time = #{createTime,jdbcType=TIMESTAMP},
- </if>
- <if test="updateTime != null" >
- update_time = #{updateTime,jdbcType=TIMESTAMP},
- </if>
- </set>
- where message_id = #{messageId,jdbcType=VARCHAR}
- </update>
- <update id="updateByPrimaryKey" parameterType="com.didiok.rabbit.producer.entity.BrokerMessage" >
- update broker_message
- set message = #{message,jdbcType=VARCHAR, typeHandler=com.didiok.rabbit.common.mybatis.handler.MessageJsonTypeHandler},
- try_count = #{tryCount,jdbcType=INTEGER},
- status = #{status,jdbcType=VARCHAR},
- next_retry = #{nextRetry,jdbcType=TIMESTAMP},
- create_time = #{createTime,jdbcType=TIMESTAMP},
- update_time = #{updateTime,jdbcType=TIMESTAMP}
- where message_id = #{messageId,jdbcType=VARCHAR}
- </update>
-
-
- <update id="changeBrokerMessageStatus" >
- update broker_message bm
- set bm.status = #{brokerMessageStatus,jdbcType=VARCHAR},
- bm.update_time = #{updateTime, jdbcType=TIMESTAMP}
- where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
- </update>
-
-
- <select id="queryBrokerMessageStatus4Timeout" resultMap="BaseResultMap" >
- <![CDATA[
- select message_id, message, try_count, status, next_retry, create_time, update_time
- from broker_message bm
- where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}
- and bm.next_retry < sysdate()
- ]]>
- </select>
-
- <select id="queryBrokerMessageStatus" resultMap="BaseResultMap" >
- select message_id, message, try_count, status, next_retry, create_time, update_time
- from broker_message bm
- where bm.status = #{brokerMessageStatus,jdbcType=VARCHAR}
- </select>
-
-
- <update id="update4TryCount" >
- update broker_message bm
- set bm.try_count = bm.try_count + 1,
- bm.update_time = #{updateTime,jdbcType=TIMESTAMP}
- where bm.message_id = #{brokerMessageId,jdbcType=VARCHAR}
- </update>
-
-
- </mapper>
- @Mapper
- public interface BrokerMessageMapper {
-
- int deleteByPrimaryKey(String messageId);
-
- int insert(BrokerMessage record);
-
- int insertSelective(BrokerMessage record);
-
- BrokerMessage selectByPrimaryKey(String messageId);
-
- int updateByPrimaryKeySelective(BrokerMessage record);
-
- int updateByPrimaryKeyWithBLOBs(BrokerMessage record);
-
- int updateByPrimaryKey(BrokerMessage record);
-
- void changeBrokerMessageStatus(@Param("brokerMessageId")String brokerMessageId, @Param("brokerMessageStatus")String brokerMessageStatus, @Param("updateTime")Date updateTime);
-
- List<BrokerMessage> queryBrokerMessageStatus4Timeout(@Param("brokerMessageStatus")String brokerMessageStatus);
-
- List<BrokerMessage> queryBrokerMessageStatus(@Param("brokerMessageStatus")String brokerMessageStatus);
-
- int update4TryCount(@Param("brokerMessageId")String brokerMessageId, @Param("updateTime")Date updateTime);
-
- }
- @Service
- public class MessageStoreService {
-
- @Autowired
- private BrokerMessageMapper brokerMessageMapper;
-
- public int insert(BrokerMessage brokerMessage) {
- return this.brokerMessageMapper.insert(brokerMessage);
- }
-
- public BrokerMessage selectByMessageId(String messageId) {
- return this.brokerMessageMapper.selectByPrimaryKey(messageId);
- }
-
- public void succuess(String messageId) {
- this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
- BrokerMessageStatus.SEND_OK.getCode(),
- new Date());
- }
-
- public void failure(String messageId) {
- this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
- BrokerMessageStatus.SEND_FAIL.getCode(),
- new Date());
- }
-
- public List<BrokerMessage> fetchTimeOutMessage4Retry(BrokerMessageStatus brokerMessageStatus){
- return this.brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus.getCode());
- }
-
- public int updateTryCount(String brokerMessageId) {
- return this.brokerMessageMapper.update4TryCount(brokerMessageId, new Date());
- }
- }
- @Configuration
- @PropertySource({"classpath:rabbit-producer-message.properties"})
- public class RabbitProducerDataSourceConfiguration {
-
- private static Logger LOGGER = org.slf4j.LoggerFactory.getLogger(RabbitProducerDataSourceConfiguration.class);
-
- @Value("${rabbit.producer.druid.type}")
- private Class<? extends DataSource> dataSourceType;
-
- @Bean(name = "rabbitProducerDataSource")
- @Primary
- // 以这个rabbit.producer.druid.jdbc为前缀的属性值都会注入到DataSource中
- @ConfigurationProperties(prefix = "rabbit.producer.druid.jdbc")
- public DataSource rabbitProducerDataSource() throws SQLException {
- DataSource rabbitProducerDataSource = DataSourceBuilder.create().type(dataSourceType).build();
- LOGGER.info("============= rabbitProducerDataSource : {} ================", rabbitProducerDataSource);
- return rabbitProducerDataSource;
- }
-
- public DataSourceProperties primaryDataSourceProperties(){
- return new DataSourceProperties();
- }
-
- public DataSource primaryDataSource(){
- return primaryDataSourceProperties().initializeDataSourceBuilder().build();
- }
-
- }
- /**
- * $BrokerMessageConfiguration
- * 帮我执行SQL脚本
- * 帮我进行数据库表结构的创建
- *
- */
- @Configuration
- public class BrokerMessageConfiguration {
-
- @Autowired
- private DataSource rabbitProducerDataSource;
-
- /**
- * 加载 rabbit-producer-message-schema.sql 脚本(这是一个建表语句)
- */
- @Value("classpath:rabbit-producer-message-schema.sql")
- private Resource schemaScript;
-
- @Bean
- public DataSourceInitializer initDataSourceInitializer() {
- System.err.println("--------------rabbitProducerDataSource-----------:" + rabbitProducerDataSource);
- final DataSourceInitializer initializer = new DataSourceInitializer();
- // 设置之前生成的数据源
- initializer.setDataSource(rabbitProducerDataSource);
- // 执行指定的sql脚本
- initializer.setDatabasePopulator(databasePopulator());
- return initializer;
- }
-
- /**
- * 执行指定的sql脚本
- * @return
- */
- private DatabasePopulator databasePopulator() {
- final ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
- populator.addScript(schemaScript);
- return populator;
- }
- }
- @Configuration
- // @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration执行完才能执行,即数据源生成之后才能执行当前类
- @AutoConfigureAfter(value = {RabbitProducerDataSourceConfiguration.class})
- public class RabbitProducerMyBatisConfiguration {
-
- @Resource(name= "rabbitProducerDataSource")
- private DataSource rabbitProducerDataSource;
-
- @Bean(name="rabbitProducerSqlSessionFactory")
- public SqlSessionFactory rabbitProducerSqlSessionFactory(DataSource rabbitProducerDataSource) {
-
- SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
- bean.setDataSource(rabbitProducerDataSource);
- ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
- try {
- // mapper.xml文件加载,这些配置本可以写在 application.yml 中,但是由于要作为一个基础组件,所以写在代码里,跟业务层面解绑,让业务层面无感知
- bean.setMapperLocations(resolver.getResources("classpath:com/didiok/rabbit/producer/mapping/*.xml"));
- SqlSessionFactory sqlSessionFactory = bean.getObject();
- sqlSessionFactory.getConfiguration().setCacheEnabled(Boolean.TRUE);
- return sqlSessionFactory;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Bean(name="rabbitProducerSqlSessionTemplate")
- public SqlSessionTemplate rabbitProducerSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
- return new SqlSessionTemplate(sqlSessionFactory);
- }
-
- }
- @Configuration
- // @AutoConfigureAfter是指等到RabbitProducerDataSourceConfiguration执行完才能执行,即数据源生成之后才能执行当前类
- @AutoConfigureAfter(RabbitProducerDataSourceConfiguration.class)
- public class RabbitProducerMybatisMapperScanerConfig {
-
- @Bean(name="rabbitProducerMapperScannerConfigurer")
- public MapperScannerConfigurer rabbitProducerMapperScannerConfigurer() {
- // mapper.java文件加载,这些配置本可以写在 application.yml 中,但是由于要作为一个基础组件,所以写在代码里,跟业务层面解绑,让业务层面无感知
- MapperScannerConfigurer mapperScannerConfigurer = new MapperScannerConfigurer();
- mapperScannerConfigurer.setSqlSessionFactoryBeanName("rabbitProducerSqlSessionFactory");
- mapperScannerConfigurer.setBasePackage("com.didiok.rabbit.producer.mapper");
- return mapperScannerConfigurer;
- }
-
- }
- /**
- * $RabbitBrokerImpl 真正的发送不同类型的消息实现类
- *
- */
- @Slf4j
- @Component
- public class RabbitBrokerImpl implements RabbitBroker {
-
- @Autowired
- private RabbitTemplateContainer rabbitTemplateContainer;
-
- @Autowired
- private MessageStoreService messageStoreService;
-
- /**
- * 可靠性消息发送
- */
- @Override
- public void reliantSend(Message message) {
- message.setMessageType(MessageType.RELIANT);
- BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());
- if(bm == null) {
- //1. 把数据库的消息发送日志先记录好
- Date now = new Date();
- BrokerMessage brokerMessage = new BrokerMessage();
- brokerMessage.setMessageId(message.getMessageId());
- brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());
- //tryCount默认等于0 所以在最开始发送的时候不需要进行设置
- brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
- brokerMessage.setCreateTime(now);
- brokerMessage.setUpdateTime(now);
- brokerMessage.setMessage(message);
- messageStoreService.insert(brokerMessage);
- }
- //2. 执行真正的发送消息逻辑
- sendKernel(message);
- }
-
- @Override
- public void rapidSend(Message message) {
- // 省略...
- }
-
- /**
- * $sendKernel 发送消息的核心方法 使用异步线程池进行发送消息
- * @param message
- */
- private void sendKernel(Message message) {
- AsyncBaseQueue.submit((Runnable) () -> {
- CorrelationData correlationData =
- // 回调函数confirm中需要用到message.getMessageId(), message.getMessageType()。所以可以放在CorrelationData中
- new CorrelationData(String.format("%s#%s#%s",
- message.getMessageId(),
- System.currentTimeMillis(),
- message.getMessageType()));
- String topic = message.getTopic();
- String routingKey = message.getRoutingKey();
- RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
- rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
- log.info("#RabbitBrokerImpl.sendKernel# send to rabbitmq, messageId: {}", message.getMessageId());
- });
- }
-
- @Override
- public void confirmSend(Message message) {
- // 省略...
- }
-
- @Override
- public void sendMessages() {
- // 省略...
- }
-
- }
并且在回调函数中,也要添加相应的逻辑:
- /**
- * $RabbitTemplateContainer池化封装
- * 每一个topic 对应一个RabbitTemplate
- * 1. 提高发送的效率
- * 2. 可以根据不同的需求制定化不同的RabbitTemplate, 比如每一个topic 都有自己的routingKey规则
- */
- @Slf4j
- @Component
- public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {
-
- private Map<String /* TOPIC */, RabbitTemplate> rabbitMap = Maps.newConcurrentMap();
-
- private Splitter splitter = Splitter.on("#");
-
- private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;
-
- @Autowired
- private ConnectionFactory connectionFactory;
-
- @Autowired
- private MessageStoreService messageStoreService;
-
- public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {
- // 省略...
- }
-
- /**
- * 无论是 confirm 消息 还是 reliant 消息 ,发送消息以后 broker都会去回调confirm
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- // 具体的消息应答
- List<String> strings = splitter.splitToList(correlationData.getId());
- String messageId = strings.get(0);
- long sendTime = Long.parseLong(strings.get(1));
- String messageType = strings.get(2);
- if(ack) {
- // 当Broker 返回ACK成功时, 就是更新一下日志表里对应的消息发送状态为 SEND_OK
-
- // 如果当前消息类型为reliant 我们就去数据库查找并进行更新
- if(MessageType.RELIANT.endsWith(messageType)) {
- this.messageStoreService.succuess(messageId);
- }
- log.info("send message is OK, confirm messageId: {}, sendTime: {}", messageId, sendTime);
- } else {
- log.error("send message is Fail, confirm messageId: {}, sendTime: {}", messageId, sendTime);
-
- }
- }
- }
上面大部分代码都是在实现迅速类型的消息发送时已经编写了,只是在 confirm()方法中添加了:
- // 如果当前消息类型为reliant 我们就去数据库查找并进行更新
- if(MessageType.RELIANT.endsWith(messageType)) {
- this.messageStoreService.succuess(messageId);
- }
这里的定时任务是使用 ElasticJob,并对其进行封装,封装在项目 rabbit-task中,封装成为了两个注解 @EnableElasticJob 和 @ElasticJobConfig 。
具体的 ElasticJob 的使用和封装过程可参考教程:ElasticJob使用与封装
- <dependency>
- <groupId>com.bfxy.base.rabbit</groupId>
- <artifactId>rabbit-task</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
在当前项目 rabbit-core-producer 中的 自动装配类 中添加注解 @EnableElasticJob,使得当 应用程序启动的时候,就能对 ZooKeeper注册中心进行初始化,以及 ElasticJob的定时任务解析类 ElasticJobConfParser 的初始化。
- /**
- * $RabbitProducerAutoConfiguration 自动装配
- *
- */
- @EnableElasticJob
- @Configuration
- @ComponentScan({"com.didiok.rabbit.producer.*"})
- public class RabbitProducerAutoConfiguration {
-
-
- }
这里为了消息的可靠性发送,我们需要抓取 超时却仍处于待确认状态 的消息,进行重新发送消息。这里使用 ElasticJob 的流式定时任务 DataFlowJob。
- @Component
- @ElasticJobConfig(
- name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob",
- cron= "0/10 * * * * ?",
- description = "可靠性投递消息补偿任务",
- overwrite = true,
- shardingTotalCount = 1
- )
- @Slf4j
- public class RetryMessageDataflowJob implements DataflowJob<BrokerMessage>{
-
- @Autowired
- private MessageStoreService messageStoreService;
-
- @Autowired
- private RabbitBroker rabbitBroker;
-
- private static final int MAX_RETRY_COUNT = 3;
-
- @Override
- public List<BrokerMessage> fetchData(ShardingContext shardingContext) {
- // 抓取状态为未确认,而且 next_retry 小于当前时间的这些消息,为了确定百分百能发送成功,需要再进行重发
- List<BrokerMessage> list = messageStoreService.fetchTimeOutMessage4Retry(BrokerMessageStatus.SENDING);
- log.info("--------@@@@@ 抓取数据集合, 数量: {} @@@@@@-----------" , list.size());
- return list;
- }
-
- @Override
- public void processData(ShardingContext shardingContext, List<BrokerMessage> dataList) {
-
- dataList.forEach( brokerMessage -> {
-
- String messageId = brokerMessage.getMessageId();
- if(brokerMessage.getTryCount() >= MAX_RETRY_COUNT) {
- // 重试次数大于3,就不再进行重发了,直接认为发送失败,更改标记为失败
- this.messageStoreService.failure(messageId);
- log.warn(" -----消息设置为最终失败,消息ID: {} -------", messageId);
- } else {
- // 每次重发的时候要更新一下try_count和next_retry字段
- this.messageStoreService.updateTryCount(messageId);
- // 重发消息
- this.rabbitBroker.reliantSend(brokerMessage.getMessage());
- }
-
- });
- }
- }
上面的代码中加入了注解
@ElasticJobConfig( name= "com.bfxy.rabbit.producer.task.RetryMessageDataflowJob", cron= "0/10 * * * * ?", description = "可靠性投递消息补偿任务", overwrite = true, shardingTotalCount = 1 )
则该类中的逻辑会定时执行。
对于重发消息的代码 this.rabbitBroker.reliantSend(brokerMessage.getMessage());,之前已经做过说明,这里不再赘述。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。