当前位置:   article > 正文

【RocketMQ】RocketMQ支持事务消息机制 一_社区版rocketmq支持事务消息吗

社区版rocketmq支持事务消息吗


相关文章:
【RocketMQ】事务消息
RocketMQ支持事务消息机制 一
RocketMQ支持事务消息机制 二

概述

本篇文章的核心是RocketMQ如何支持事务的

有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如 RabbitMQKafka 都不支持。

1. 事务消费

我们经常支付宝转账余额宝,这是日常生活的一件普通小事,但是我们思考支付宝扣除转账的钱之后,如果系统挂掉怎么办,这时余额宝账户并没有增加相应的金额,数据就会出现不一致状况了。

上述场景在各个类型的系统中都能找到相似影子,比如在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的这个商品数量必须减1吧,怎么保证?!在搜索广告系统中,当用户点击某广告后,除了在点击事件表中增加一条记录外,还得去商家账户表中找到这个商家并扣除广告费吧,怎么保证?!等等,相信大家或多或多少都能碰到相似情景。

本质上问题可以抽象为:当一个表数据更新后,怎么保证另一个表的数据也必须要更新成功。

如果是单机系统(数据库实例也在同一个系统上)的话,我们可以用本地事务轻松解决:

还是以支付宝转账余额宝为例(比如转账10000块钱),假设有
支付宝账户表:A(id,userId,amount)
余额宝账户表:B(id,userId,amount)
用户的userId=1;

从支付宝转账1万块钱到余额宝的动作分为两步:
1)支付宝表扣除1万:update A set amount=amount-10000 where userId=1;
2)余额宝表增加1万:update B set amount=amount+10000 where userId=1;

如何确保支付宝余额宝收支平衡呢?

有人说这个很简单嘛,可以用事务解决。

Begin transaction 
  update A set amount=amount-10000 where userId=1;
 update B set amount=amount+10000 where userId=1;
End transaction 
commit;
  • 1
  • 2
  • 3
  • 4
  • 5

这样确实能解决,如果你使用spring的话一个注解就能搞定上述事务功能。

@Transactional(rollbackFor=Exception.class) 

  public void update() { 

      updateATable(); 

//更新A表 

      updateBTable(); 

//更新B表 

  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

如果系统规模较小,数据表都在一个数据库实例上,上述本地事务方式可以很好地运行,但是如果系统规模较大,比如支付宝账户表和余额宝账户表显然不会在同一个数据库实例上,他们往往分布在不同的物理节点上,这时本地事务已经失去用武之地。

下面我们来看看比较主流的两种方案:

1.1 分布式事务—————— 两阶段提交协议

两阶段提交协议(Two-phase Commit,2PC)经常被用来实现分布式事务。一般分为协调器TC和若干事务执行者两种角色,这里的事务执行者就是具体的数据库,协调器可以和事务执行器在一台机器上。

在这里插入图片描述
我们根据上面的图来看看主要流程:

1) 我们的应用程序(client)发起一个开始请求到TC(transaction);

2) TC先将prepare消息写到本地日志,之后向所有的Si发起prepare消息。以支付宝转账到余额宝为例,TC给A的prepare消息是通知支付宝数据库相应账目扣款1万,TC给B的prepare消息是通知余额宝数据库相应账目增加1w。为什么在执行任务前需要先写本地日志,主要是为了故障后恢复用,本地日志起到现实生活中凭证的效果,如果没有本地日志(凭证),出问题容易死无对证;

3) Si收到prepare消息后,执行具体本机事务,但不会进行commit,如果成功返回yes,不成功返回no。同理,返回前都应把要返回的消息写到日志里,当作凭证。

4) TC收集所有执行器返回的消息,如果所有执行器都返回yes,那么给所有执行器发生送commit消息,执行器收到commit后执行本地事务的commit操作;如果有任一个执行器返回no,那么给所有执行器发送abort消息,执行器收到abort消息后执行事务abort操作。

注:TC或Si把发送或接收到的消息先写到日志里,主要是为了故障后恢复用。如某一Si从故障中恢复后,先检查本机的日志,如果已收到commit,则提交,如果abort则回滚。如果是yes,则再向TC询问一下,确定下一步。如果什么都没有,则很可能在prepare阶段Si就崩溃了,因此需要回滚。

现如今实现基于两阶段提交的分布式事务也没那么困难了,如果使用java,那么可以使用开源软件atomikos(http://www.atomikos.com/)来快速实现。来快速实现。)

不过但凡使用过的上述两阶段提交的同学都可以发现性能实在是太差,根本不适合高并发的系统。为什么?

1)两阶段提交涉及多次节点间的网络通信,通信时间太长!
2)事务时间相对于变长了,锁定的资源的时间也变长了,造成资源等待时间也增加好多!

正是由于分布式事务存在很严重的性能问题,大部分高并发服务都在避免使用,往往通过其他途径来解决数据一致性问题。

1.2 使用消息队列来避免分布式事务

如果仔细观察生活的话,生活的很多场景已经给了我们提示。

比如在北京很有名的姚记炒肝点了炒肝并付了钱后,他们并不会直接把你点的炒肝给你,而是给你一张小票,然后让你拿着小票到出货区排队去取。为什么他们要将付钱和取货两个动作分开呢?原因很多,其中一个很重要的原因是为了使他们接待能力增强(并发量更高)。

还是回到我们的问题,只要这张小票在,你最终是能拿到炒肝的。同理转账服务也是如此,当支付宝账户扣除1万后,我们只要生成一个凭证(消息)即可,这个凭证(消息)上写着“让余额宝账户增加1万”,只要这个凭证(消息)能可靠保存,我们最终是可以拿着这个凭证(消息)让余额宝账户增加1万的,即我们能依靠这个凭证(消息)完成最终一致性。

那么我们如何可靠保存凭证(消息)有两种方法:

1.2.1 业务与消息耦合的方式

支付宝在完成扣款的同时,同时记录消息数据,这个消息数据与业务数据保存在同一数据库实例里(消息记录表表名为message)。

Begin transaction 
       update A set amount=amount-10000 where userId=1; 
       insert into message(userId, amount,status) values(1, 10000, 1); 
End transaction 
commit;
  • 1
  • 2
  • 3
  • 4
  • 5

上述事务能保证只要支付宝账户里被扣了钱,消息一定能保存下来。

当上述事务提交成功后,我们通过实时消息服务将此消息通知余额宝,余额宝处理成功后发送回复成功消息,支付宝收到回复后删除该条消息数据

注意:最后那句,支付宝收到回复后删除该条消息数据,这牵扯到消息重复发送问题,如果没有正确的删除,会引发消息重发。下文会讲到。

1.2.2 业务与消息解耦方式

上述保存消息的方式使得消息数据和业务数据紧耦合在一起,从架构上看不够优雅,而且容易诱发其他问题。为了解耦,可以采用以下方式。

1)支付宝在扣款事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不真正发送,只有消息发送成功后才会提交事务;

对应RocketMQ的half消息

2)当支付宝扣款事务被提交成功后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才真正发送该消息;

3)当支付宝扣款事务提交失败回滚后,向实时消息服务取消发送。在得到取消发送指令后,该消息将不会被发送;

4)对于那些未确认的消息或者取消的消息,需要有一个消息状态确认系统定时去支付宝系统查询这个消息的状态并进行更新。为什么需要这一步骤,举个例子:假设在第2步支付宝扣款事务被成功提交后,系统挂了,此时消息状态并未被更新为“确认发送”,从而导致消息不能被发送。

对应RocketMQ的unkown状态的消息,会重试检查

优点:消息数据独立存储,降低业务系统与消息系统间的耦合;
缺点:一次消息发送需要两次请求;业务处理服务需要实现消息状态回查接口。

2.那么如何解决消息重复投递的问题?

假如我们采用消息队列的方式解决分布式事务,存在新的问题,即重复发送消息问题。

还有一个很严重的问题就是消息重复投递,以我们支付宝转账到余额宝为例,如果相同的消息被重复投递两次,那么我们余额宝账户将会增加2万而不是1万了(上面讲顺序消费是讲过,这里再提一下)。

为什么相同的消息会被重复投递? 比如余额宝处理完消息msg后,发送了处理成功的消息给支付宝,正常情况下支付宝应该要删除消息msg,但如果支付宝这时候悲剧的挂了,重启后一看消息msg还在,就会继续发送消息msg

解决方法很简单,在余额宝这边增加消息应用状态表(message_apply),通俗来说就是个账本,用于记录消息的消费情况,每次来一个消息,在真正执行之前,先去消息应用状态表中查询一遍,如果找到说明是重复消息,丢弃即可,如果没找到才执行,同时插入到消息应用状态表(同一事务)


For each msg in queue 
	Begin transaction 
	  select count(*) as cnt from message_apply where msg_id=msg.msg_id; 
	  if cnt==0 then 
		update B set amount=amount+10000 where userId=1; 
		insert into message_apply(msg_id) values(msg.msg_id); 
	  end if
	End transaction 
	commit;
End For
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

为了方便大家理解,我们再来举一个银行转账的示例(和上一个例子差不多):

比如,Bob向Smith转账100块。

在单机环境下,执行事务的情况,大概是下面这个样子:
在这里插入图片描述
当用户增长到一定程度,Bob和Smith的账户及余额信息已经不在同一台服务器上了,那么上面的流程就变成了这样:
在这里插入图片描述
这时候你会发现,同样是一个转账的业务,在集群环境下,耗时居然成倍的增长,这显然是不能够接受的。那如何来规避这个问题?

2.1 大事务 = 小事务 + 异步

将大事务拆分成多个小事务异步执行。这样基本上能够将跨机事务的执行效率优化到与单机一致。转账的事务就可以分解成如下两个小事务:
在这里插入图片描述
图中执行本地事务(Bob账户扣款)和发送异步消息应该保证同时成功或者同时失败,也就是扣款成功了,发送消息一定要成功,如果扣款失败了,就不能再发送消息。那问题是:我们是先扣款还是先发送消息呢?

首先看下先发送消息的情况,大致的示意图如下:

在这里插入图片描述
存在的问题是:如果消息发送成功,但是扣款失败,消费端就会消费此消息,进而向Smith账户加钱。

先发消息不行,那就先扣款吧,大致的示意图如下:
在这里插入图片描述

存在的问题跟上面类似:如果扣款成功,发送消息失败,就会出现Bob扣钱了,但是Smith账户未加钱。

可能大家会有很多的方法来解决这个问题,比如:直接将发消息放到Bob扣款的事务中去,如果发送失败,抛出异常,事务回滚。这样的处理方式也符合“恰好”不需要解决的原则。

2.2 RocketMQ的实现

2.2.1 RocketMQ事务实现原理

RocketMQ支持事务消息,下面来看看RocketMQ是怎样来实现的?
在这里插入图片描述
RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。

细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?

RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

那我们来看下RocketMQ源码,是如何处理事务消息的。

客户端发送事务消息的部分(完整代码请查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer)

// =============================发送事务消息的一系列准备工作========================================
// 未决事务,MQ服务器回查客户端
// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数
Message msg = new Message(......);
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

这是生产者概要步骤,详细实例参见下文

接着查看sendMessageInTransaction方法的源码,总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。


//  ================================事务消息的发送过程=============================================

public TransactionSendResult sendMessageInTransaction(.....) {

    // 逻辑代码,非实际代码

    // 1.发送消息

    sendResult = this.send(msg);

    // sendResult.getSendStatus() == SEND_OK

    // 2.如果消息发送成功,处理与消息关联的本地事务单元

    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);

    // 3.结束事务

    this.endTransaction(sendResult, localTransactionState, localException);

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

endTransaction方法会将请求发往broker(mq server)去更新事务消息的最终状态:

1.根据sendResult找到Prepared消息 ,sendResult包含事务消息的ID

2.根据localTransaction更新消息的最终状态

如果endTransaction()方法执行失败,数据没有发送到broker,导致事务消息的 状态更新失败,broker会有回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是prepared状态则会向Producer发起CheckTransaction请求,Producer会调用DefaultMQProducerImpl.checkTransactionState()方法来处理broker的定时回调请求,而checkTransactionState会调用我们的事务设置的决断方法来决定是回滚事务还是继续执行,最后调用endTransactionOneway让broker来更新消息的最终状态。

2.2.2 消费失败和消费超时

再回到转账的例子,如果Bob的账户的余额已经减少,且消息已经发送成功,Smith端开始消费这条消息,这个时候就会出现消费失败消费超时两个问题,解决超时问题的思路就是一直重试,直到消费端消费消息成功,整个过程中有可能会出现消息重复的问题,按照前面的思路解决即可。

在这里插入图片描述
这样基本上可以解决消费端超时问题,但是如果消费失败怎么办? 阿里提供给我们的解决方法是:人工解决。大家可以考虑一下,按照事务的流程,因为某种原因Smith加款失败,那么需要回滚整个流程。如果消息系统要实现这个回滚流程的话,系统复杂度将大大提升,且很容易出现Bug,估计出现Bug的概率会比消费失败的概率大很多。这也是RocketMQ目前暂时没有解决这个问题的原因,在设计实现消息系统时,我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题,这也是大家在解决疑难问题时需要多多思考的地方。

当一条消息消费失败时,RocketMQ会进行一定次数的重试,经过M次重试后,仍然没有被成功消费,此时,RocketMQ不会立即将消息丢弃,而是将其发送到该消费者对应的特殊队列(死信队列)中去。参见 消息重试和死信队列

下文会有方案,即电子对账方式查找出问题的订单。而不是让mq实现回滚操作。

我们需要注意的是,在3.2.6版本中移除了事务消息的实现,所以此版本不支持事务消息。也就是说,消息失败不会进行检查。

2.2.3 交易事务处理示例

生产者:

TransactionProducer :

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

TransactionListenerImpl :

import ...

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

消息消费者:

public class Consumer {
	public static void main(String[] args) throws MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_producer");
		consumer.setNamesrvAddr("127.0.0.1:9876");
		/**
		 * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
		 * 如果非第一次启动,那么按照上次消费的位置继续消费
		 */
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		consumer.subscribe("TopicTransaction", "*");
		consumer.registerMessageListener(new MessageListenerOrderly() {
			private Random random = new Random();
 
			@Override
			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
				// 设置自动提交
				context.setAutoCommit(true);
				for (MessageExt msg : msgs) {
					System.out.println("获取到消息开始消费:"+msg + " , content : " + new String(msg.getBody()));
				}
				try {
					// 模拟业务处理
					TimeUnit.SECONDS.sleep(random.nextInt(5));
				} catch (Exception e) {
					e.printStackTrace();
					//返回处理失败,该消息后续可以继续被消费
					return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
				}
				//返回处理成功,该消息就不会再次投递过来了
				return ConsumeOrderlyStatus.SUCCESS;
			}
		});
		consumer.start();
		System.out.println("consumer start ! ");
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

我们先启动消费端,然后启动生产端:

在运行之前,我们先来看一下,web控制台的消息:
在这里插入图片描述
Topic transaction_producer中没有未消费的消息,下面开始执行代码逻辑。

生产端:
在这里插入图片描述

通过日志可以看到其发送了四条消息,交易逻辑被调用了4次,其中只有一条消息反馈的结果为COMMIT_MESSAGE。

再次查看控制台:

在这里插入图片描述
可以看到其Topic transaction_producer只有一条待消费的消息,这个和发送端只一条消息被COMMIT的结论相符合。

其余消息有的被拒绝删除了,有的在隐藏队列中,稍后会进行重试

消费端:
在这里插入图片描述
在这里插入图片描述
这就是为什么我们生产了四条消息,最后却只消费了一条。

我们上面的代码还有这么一段:

//服务器回调producer,检查本地事务分支成功还是失败

     producer.setTransactionCheckListener(new TransactionCheckListener() {

         @Override

         public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {

             System.out.println("state --" + new String(messageExt.getBody()));

             return LocalTransactionState.COMMIT_MESSAGE;

         }

     });

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

这一段代码已经不能够实现相应的功能了(阿里把回查接口实现已经给删除了),回查的逻辑已经不进行开源了(3.2.6),商业版的RocketMQ可以实现消息回查(3.0.8版本也有相应的回查代码。有兴趣的可以进行查看源代码)

3. 如何保证扣钱与加钱的事务的最终一致性

在上面的转账交易逻辑中,存在两个问题:

1)如果没有使用RocketMQ的企业版本,那就可能会发生扣钱的事务成功了,但是扣钱的消息由于生产方发生了故障,导致交易消息没有在扣钱的事务提交成功后往RocketMQ中确认该条消息可以被提交,就会导致该条消息不会递交给消费方,导致Bob的钱被扣了,但是Smith的钱却没有增加。

2)生产方的全部逻辑都处理完成了,扣钱的事务在数据库中被成功的提交,扣钱的消息在RocketMQ被成功的确认,但是消费方在消费消息的时候,自己本身发生了故障,或者处理该条消息发生了逻辑错误,导致Smith的钱没有被正确的加上。

以上两个问题虽然发生的机率都很低,但是只要存在着发生的机率就会一定在某个时间点发生,只是故障发生时间点的早晚问题。在金融系统中,每日都会跑系统日志执行对账操作,用于核对当日总共的支付与收入是否是平衡的、每个单笔交易结果是否都满足借贷平衡等,因而为了避免以上两个问题的发生,我的处理方式还是引入金融系统对账的业务逻辑来进行处理。

其业务处理逻辑如下:
在这里插入图片描述
在发送交易事务消息过后,发送一个交易对账消息到对账Topic中,该对账消息为非事务消息,发送成功即表示成功保存到了RocketMQ中,该交易对账消息不会用于消费者消费,后续的交易对账系统会消费该队列中的对账信息,其分别会和交易的生产方和消费方进行交易核对,核对逻辑如下:
在这里插入图片描述
交易对账系统首先和交易的消费方进行核对,如果消费方消费成功,则可以说明整个交易结果满足最终一致性,因为消息是生产者成功处理后,然后再发送的交易确认消息,因而只要产生了事务确认的交易消息,则可以肯定生产方已经正常执行完了扣款的逻辑。

只有交易消息在消费方处理失败或者消息方没有消费该消息的情况下,才需要再次和生产方进行确认,如果生产方成功执行了扣款操作,则需要回滚这笔扣款交易;如果没有扣除成功,则表示两边都没有消费这边交易,就不用做任何操作了。

参考

RocketMQ支持事务消息机制
分布式事务中使用RocketMQ的事务消息机制优化事务的处理逻辑 参考主体

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/622526
推荐阅读
相关标签
  

闽ICP备14008679号