栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringCloudAlibaba - RocketMQ 分布式事务消息的实现

大数据系统 更新时间:发布时间: 百科书网 趣学号

目录
  • 前言
    • 环境
  • 分布式事务消息流程
    • 流程图
    • 流程解析
    • 事务消息三种状态
  • 具体实现
    • 实现代码
      • 内容中心
      • 用户中心
    • 测试
  • 项目源码

前言

RocketMQ提供了事务消息去解决程序异常回滚但消息已发出的问题,如服务A插入一条数据后服务B需要对用户数据进行修改,而服务A发出消息后程序发生异常导致数据插入回滚,而服务B监听到消息又对数据进行了修改,导致数据出现问题


环境

Spring Cloud Hoxton.SR9 + Spring Cloud Alibaba 2.2.6.RELEASE + RocketMQ 4.7.0


分布式事务消息流程 流程图

流程解析
  • 第1步:生产者向MQ Server发送半消息(特殊消息,会被存储到MQ Server且标记为暂时不能投递),消费者不会接收到这条消息
  • 第2 3步:当半消息发送成功后生产者就去执行本地事务
  • 第4步:生产者根据本地事务的执行状态向MQ Server发送二次确认请求,如果MQ Server收到的是commit就将半消息标记为可投递,消费者即可消费到该消息,如果接收到是rollback就将这条半消息删除
  • 第5步:如果第四步的二次确认没有能够成功发送到MQ Server,经过一段时间后,MQ Server会向生产者发送回查消息去获取本地事务的执行状态
  • 第6步:生产者检查本地事务执行状态
  • 第7步:生产者根据本地事务的执行结果告诉MQ Server应该commit还是rollback,如果是commit则像消费者投递消息,如果是rollback则丢弃消息

注:
1234步是一种二次确认的机制,生产者把消息发送到MQ,MQ做了标记不让去消费这条消息,生产者去执行本地事务,完成后根据执行状态去投递或丢弃消息
567步是MQ没有收到二次确认做的容错处理


事务消息三种状态
  • Commit:提交事务消息,消费者可以消费此消息
  • Rollback:回滚事务消息,broker会删除该消息,消费者不能消费
  • UNKNOWN: broker需要回查确认该消息的状态

具体实现 实现代码

问题场景:内容中心插入一条数据后用户中心需要对用户数据进行修改,而内容中心发出消息后程序发生异常导致数据插入回滚,而用户中心监听到消息又对数据进行了修改导致数据的不一致,下面将用RocketMQ的分布式事务消息验证下该场景的处理方式


内容中心
  • 表结构
CREATE TABLE `test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `title` varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='插入数据测试表'

CREATE TABLE `rocketmq_transaction_log` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `transaction_Id` varchar(45) COLLATE utf8_unicode_ci NOT NULL COMMENT '事务id',
  `log` varchar(45) COLLATE utf8_unicode_ci NOT NULL COMMENT '日志',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='RocketMQ事务日志表'
  • TestRocketController.java
@PostMapping("test1")
public Test test1() {
    return testService.insertTest();
}
  • TestService.java
import com.alibaba.csp.sentinel.annotation.SentinelResource;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;

@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class TestService {

    private final TestMapper testMapper;
    private final RocketMQTemplate rocketMQTemplate;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    public Test insertTest() {
        Test test = Test.builder()
                .title("世事短如春梦,春梦了无痕,譬如春梦,黄粱未熟蕉鹿走")
                .build();

        
        rocketMQTemplate.sendMessageInTransaction(
                "add-test",
                MessageBuilder.withPayload(test)
                              .setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID().toString())
                              .build(),
                 test
        );

        return test;
    }

    
    @Transactional(rollbackFor = Exception.class)
    public void insertTestDataWithRocketMqLog(Test test, String transactionId) {
        this.insertTestData(test);

        rocketmqTransactionLogMapper.insertSelective(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .log("插入了一条Test数据...")
                        .build()
        );
    }

    
    @Transactional(rollbackFor = Exception.class)
    public void insertTestData(Test test) {
        testMapper.insertSelective(test);
    }
}
  • TestMapper.java
public interface TestMapper extends Mapper {
}
  • RocketmqTransactionLogMapper.java
public interface RocketmqTransactionLogMapper extends Mapper {
}
  • Test.java
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@Table(name = "test")
public class Test {

    
    @Id
    @GeneratedValue(generator = "JDBC")
    private Integer id;

    
    private String title;

}
  • RocketmqTransactionLog.java
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@Table(name = "rocketmq_transaction_log")
public class RocketmqTransactionLog {
    
    @Id
    @GeneratedValue(generator = "JDBC")
    private Integer id;

    
    @Column(name = "transaction_Id")
    private String transactionId;

    
    private String log;
}
  • AddTestTransactionListener.java
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import java.util.Objects;


@RocketMQTransactionListener
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddTestTransactionListener implements RocketMQLocalTransactionListener {

    private final TestService testService;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {

        MessageHeaders headers = message.getHeaders();

        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        try {
            testService.insertTestDataWithRocketMqLog((Test) o, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        // 根据记录的事务回查
        RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
                RocketmqTransactionLog.builder()
                        .transactionId(transactionId)
                        .build()
        );

        // 本地事务执行成功
        if (Objects.nonNull(transactionLog)) {
            return RocketMQLocalTransactionState.COMMIT;
        }

        // 本地事务执行失败
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

用户中心
  • TestRocketConsumer.java
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "add-test")
public class TestRocketConsumer implements RocketMQListener {
    @Override
    public void onMessage(Test test) {
        // TODO 业务处理
        try {
            log.info("监听到主题为'add-test'的消息:" + new ObjectMapper().writevalueAsString(test));
            log.info("可以开始处理业务啦啦啦");
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}

测试
  • 如图所示,在执行数据插入后还未向MQ Server发送本地事务的执行状态时,模拟服务宕机,将服务kill掉

  • Kill内容中心进程

  • 此时未向MQ Server发送本地事务的执行状态,MQ Server中的消息不会投递到用户中心,用户中心未收到消息不会进行后续的业务处理,如下所示,重启应用后进入本地事务回查

  • 本地事务回查后用户中心正常监听到消息进行业务处理

  • 至此,已完成RocketMQ分布式事务消息的实现

项目源码
  • GitHub: https://github.com/Maggieq8324/coisini-cloud-alibaba
  • Gitee: https://gitee.com/maggieq8324/coisini-cloud-alibaba

- End -
白嫖有风险
点赞加收藏
转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/279858.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号