• 欢迎访问 winrains 的个人网站!
  • 本网站主要从互联网整理和收集了与Java、网络安全、Linux等技术相关的文章,供学习和研究使用。如有侵权,请留言告知,谢谢!

RocketMq之事务消息实现原理

消息队列 winrains 来源:大王叫下 3个月前 (06-28) 52次浏览

RocketMQ提供了事务消息的功能,采用2PC(两段式协议)+补偿机制(事务回查)的分布式事务功能,通过消息队列 RocketMQ 版事务消息能达到分布式事务的最终一致。

概览

  • 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列 RocketMQ 版服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

交互流程

事务消息发送步骤如下:

  1. 发送方将半事务消息发送至消息队列 RocketMQ 版服务端。
  2. 消息队列 RocketMQ 版服务端将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

事务消息回查步骤如下:

  1. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。

总体而言RocketMQ事务消息分为两条主线

  • 发送流程:发送half message(半消息),执行本地事务,发送事务执行结果
  • 定时任务回查流程:MQ定时任务扫描半消息,回查本地事务,发送事务执行结果

源码分析

Producer是如何发送事务半消息的(prepare)

在本地应用发送事务消息的核心类是TransactionMQProducer,该类通过继承DefaultMQProducer来复用大部分发送消息相关的逻辑,这个类的代码量非常少只有100来行,下面是这个类的sendMessageTransaction方法

@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final Object arg) throws MQClientException {
    //判断transactionListener是否存在
    if (null == this.transactionListener) {
        throw new MQClientException("TransactionListener is null", null);
    }
    //发送事务消息
    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

这里的transactionListener就是上面所说的消息回查的类,它提供了2个方法:

  • executeLocalTransaction执行本地事务
  • checkLocalTransaction回查本地事务

接着看DefaultMQProducer.sendMessageInTransaction()方法:

public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    //判断检查本地事务的listenner是否存在
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }

    //。。。省略

    SendResult sendResult = null;
    //msg设置参数TRAN_MSG,表示为事务消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        //发送消息
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                if (null != localTransactionExecuter) {
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
                    log.debug("Used new transaction API");
                    //发送消息成功,执行本地事务
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    try {
        //执行endTransaction方法,如果半消息发送失败或本地事务执行失败告诉服务端是删除半消息,半消息发送成功且本地事务执行成功则告诉服务端生效半消息
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    //...省略
    return transactionSendResult;
}

该方法主要做了以下事情

  • 给消息打上事务消息相关的tag,用于broker区分普通消息和事务消息
  • 发送半消息(half message)
  • 发送成功则由transactionListener执行本地事务
  • 执行endTransaction方法,告诉 broker 执行 commit/rollback。

Broker端是如何处理事务消息的

Broker端通过SendMessageProcessor.processRequest()方法接收处理 Producer 发送的消息 最后会调用到SendMessageProcessor.sendMessage(),判断消息类型,进行消息存储。

//SendMessageProcessor.java

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                    final RemotingCommand request,
                                    final SendMessageContext sendMessageContext,
                                    final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
//...省略
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
             return response;
       }
      //存储事务消息
      putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
      //存储普通消息
      putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

接着看存储半消息的代码 prepareMessage(msgInner) :

//TransactionalMessageBridge.java

//存储事务半消息
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
    return store.putMessage(parseHalfMessageInner(messageInner));
}

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    //备份消息的原主题名称与原队列ID
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    //事务消息的topic和queueID是写死固定的
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

在这一步,备份消息的原主题名称与原队列ID,然后取消事务消息的消息标签,重新设置消息的主题为:RMQ_SYS_TRANS_HALF_TOPIC,队列ID固定为0。与其他普通消息区分开,然后完成消息持久化。
到这里,Broker 就初步处理完了 Producer 发送的事务半消息。

执行本地事务

接着我们回到 上面 Producer 发送半消息的地方,往下继续看。

switch (sendResult.getSendStatus()) {
    case SEND_OK: {
        try {
            if (sendResult.getTransactionId() != null) {
                msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
            }
            String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null != transactionId && !"".equals(transactionId)) {
                msg.setTransactionId(transactionId);
            }
            if (null != localTransactionExecuter) {
                localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
            } else if (transactionListener != null) {
                log.debug("Used new transaction API");
                //发送消息成功,执行本地事务
                localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
            }
            if (null == localTransactionState) {
                localTransactionState = LocalTransactionState.UNKNOW;
            }

            if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                log.info("executeLocalTransactionBranch return {}", localTransactionState);
                log.info(msg.toString());
            }
        } catch (Throwable e) {
            log.info("executeLocalTransactionBranch exception", e);
            log.info(msg.toString());
            localException = e;
        }
    }
    break;
    case FLUSH_DISK_TIMEOUT:
    case FLUSH_SLAVE_TIMEOUT:
    //半消息发送失败,回滚
    case SLAVE_NOT_AVAILABLE:
        localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
        break;
    default:
        break;
}

事务半消息发送成功后,会调用transactionListener.executeLocalTransaction方法执行本地事务。只有半消息发送成功后,才会执行本地事务,如果半消息发送失败,则设置回滚。

结束事务(commit/rollback)

本地事务执行后,则调用this.endTransaction()方法,根据本地事务执行状态,去提交事务或者回滚事务。
如果半消息发送失败或本地事务执行失败告诉服务端是删除半消息,半消息发送成功且本地事务执行成功则告诉服务端生效半消息

public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        //提交事务
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        //回滚
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

半消息事务回查

两段式协议发送与提交回滚消息,执行完本地事务消息的状态为UNKNOW时,结束事务不做任何操作。通过事务状态定时回查得到发送端的事务状态是rollback或commit。
通过TransactionalMessageCheckService线程定时去检测RMQ_SYS_TRANS_HALF_TOPIC主题中的消息,回查消息的事务状态。

  • RMQ_SYS_TRANS_HALF_TOPICprepare消息的主题,事务消息首先先进入到该主题。
  • RMQ_SYS_TRANS_OP_HALF_TOPIC当消息服务器收到事务消息的提交或回滚请求后,会将消息存储在该主题下。

代码入口:

public void run() {
    log.info("Start transaction check service thread!");
    //执行间隔
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
    while (!this.isStopped()) {
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}

@Override
protected void onWaitEnd() {
    //事务过期时间
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    //检查本地事务
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

大致流程如下:

这里重点说下判断消息是否要回查的逻辑:

removeMap是个Map集合的键值对key是half队列的消息offset,value是op队列的消息offset,图中看有两对(100005,80002)、(100004,80003)

doneOpOffset是一个List集合,其中存储的是op队列的消息offset,图中只有8004

check()循环查找half队列中的消息时,100004已经在removeMap中了,跳过下面业务继续循环下一个100005进行下一个逻辑,判断其是否具有回查消息的条件isNeedCheck

Broker处理END_TRANSACTION

接下来我们来一起看看,当Producer或者回查定时任务提交/回滚事务的时候,Broker如何处理事务消息提交、回滚命令的。

//EndTransactionProcessor.java

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
    RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final EndTransactionRequestHeader requestHeader =
        (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
    LOGGER.debug("Transaction request:{}", requestHeader);
    //从节点不处理
    if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
        response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
        LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
        return response;
    }

    //省略代码,打印日志
    
    OperationResult result = new OperationResult();
    //如果请求为提交事务,进入事务消息提交处理流程
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
        //根据commitLogOffset从commitlog文件中查找消息
        result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            //字段检查
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                //恢复事务消息的真实的主题、队列,并设置事务ID
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                //设置消息的相关属性,取消事务相关的系统标记
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                //发送最终消息,存储,被consumer消费
                RemotingCommand sendResult = sendFinalMessage(msgInner);
                if (sendResult.getCode() == ResponseCode.SUCCESS) {
                    //删除预处理消息(prepare)
                    //其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)。
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return sendResult;
            }
            return res;
        }
    } //回滚处理
    else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
        //根据commitlogOffset查找消息
        result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            //字段检查
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                //删除预处理消息(prepare)
                //将消息存储在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表该消息已被处理
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return res;
        }
    }
    response.setCode(result.getResponseCode());
    response.setRemark(result.getResponseRemark());
    return response;
}

这里的逻辑很清晰,其核心实现如下:

  • 根据commitlogOffset找到消息
  • 如果是提交动作,就恢复原消息的主题与队列,再次存入commitlog文件进而转到消息消费队列,供消费者消费,然后将原预处理消息存入一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC,代表该消息已被处理
  • 回滚消息,则直接将原预处理消息存入一个新的主题RMQ_SYS_TRANS_OP_HALF_TOPIC,代表该消息已被处理

整体实现流程

异常情况

假如我们有个订单系统,下单后要调用优惠券系统,我们采用RocketMq的方式,在下单支付成功后发送消息给优惠券系统派发优惠券,这里通过事务消息的方式,保证一定可以派发优惠券成功。
我们来思考下几种异常场景,看看RocketMq能不能解决。

Producer发送半消息失败

可能由于网络或者mq故障,导致 Producer 订单系统 发送半消息(prepare)失败。
这时订单系统可以执行回滚操作,比如“订单关闭”等,走逆向流程退款给用户。

半消息发送成功,本地事务执行失败

如果订单系统发送的半消息成功了,但是执行本地事务失败了,如更新订单状态为“已完成”。
这种情况下,执行本地事务失败后,会返回rollback给 MQ,MQ会删除之前发送的半消息。 也就不会调用优惠券系统了。

半消息发送成功,没收到MQ返回的响应

假如订单系统发送半消息成功后,没有收到MQ返回的响应。
这个时候可能是因为网络问题,或者其他异常报错,订单系统误以为发送MQ半消息失败,执行了逆向回滚流程。
但这个时候其实mq已经保存半消息成功了,那这个消息怎么处理?
这个时候MQ的后台消息回查定时任务TransactionalMessageCheckService会每隔1分钟扫描一次半消息队列,判断是否需要消息回查,然后回查订单系统的本地事务,这时MQ就会发现订单已经变成“已关闭”,此时就要发送rollback请求给mq,删除之前的半消息。

如果commit/rollback失败了呢

这个其实也是通过定时任务TransactionalMessageCheckService,它会发现这个消息超过一定时间还没有进行二阶段处理,就会回查本地事务。

思考

  • RocketMQ是怎么保证半消息(prepare)不被消费者消费呢?
  • 事务消息的回滚操作是怎么实现的?
  • 事务commit后,怎么让消息对消费者可见呢?

资料引用
blog.csdn.net/gotobat/art…
blog.csdn.net/prestigedin…
http://www.jianshu.com/p/4a9fceb69…

作者:大王叫下

来源:https://juejin.im/post/5d2d28405188252b88748a46


版权声明:文末如注明作者和来源,则表示本文系转载,版权为原作者所有 | 本文如有侵权,请及时联系,承诺在收到消息后第一时间删除 | 如转载本文,请注明原文链接。
喜欢 (3)