
B站视频:黑马程序员RocketMQ系统精讲 P35 - P88
手敲shop源码: 代码地址
1.1 业务分析模拟电商网站购物场景中的【下单】和【支付】业务。
1.1.1 下单用户提交订单后,扣减库存成功,扣减优惠券成功,使用余额成功; 但是在确认订单操作失败, 需要对库存,优惠券,余额进行回退。如果保证数据的完整性?
UML图, 使用MQ保证在下单失败后系统数据的完整性。
用户通过第三方支付平台(支付宝,微信)支付成功后,第三方支付平台要通过回调API异步通知商家支付系统用户的支付结果, 支付系统根据支付结果修改订单状态、记录支付日志和给用户增加积分。
商家支付系统如何保证在收到第三方支付平台的异步通知时,如何快速给第三方支付平台做出回应?
通过MQ进行数据分发,提高系统处理性能。
下载 rocketmq-spring 项目
将rocketmq-spring安装到本地仓库, 对应的就是依赖 rocketmq-spring-boot-starter
mvn install -Dmaven.skip.test=true2.2.1 消息生产者
1)添加依赖
4.0.0 org.springframework.boot spring-boot-starter-parent 2.6.7 com.crysw boot-rocketmq 0.0.1-SNAPSHOT boot-rocketmq Demo project for Spring Boot 1.8 UTF-8 1.8 1.8 org.springframework.boot spring-boot-starter-web org.apache.rocketmq rocketmq-spring-boot-starter 2.2.2 org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin org.projectlombok lombok
2) 配置文件
application.properties文件添加如下配置内容:
# nameserver rocketmq.name-server=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 生产者组名 rocketmq.producer.group=my-group
3)启动类
@SpringBootApplication
public class BootRocketmqApplication {
public static void main(String[] args) {
SpringApplication.run(BootRocketmqApplication.class, args);
}
}
4)测试类
@SpringBootTest
public class ProducerTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void testSendMsg() {
rocketMQTemplate.convertAndSend("springboot-rocketmq", "hello springboot-rocketmq");
}
}
查看rocketmq-console控制台
1)添加依赖
同消息生产者。
2)配置文件
# nameserver rocketmq.name-server=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 # 消费者组名 rocketmq.consumer.group=my-group
3)启动类
同消息生产者。
4)消息监听器
@Component // 将消息监听器注入中IOC容器中
@RocketMQMessageListener(topic = "springboot-rocketmq", consumerGroup = "${rocketmq.consumer.group}")
@Slf4j
public class Consumer implements RocketMQListener {
@Override
public void onMessage(String s) {
System.out.println("接收到消息: " + s);
}
}
5)测试
启动应用后,查看消费者打印的日志
2022-06-22 22:31:58.228 INFO 17644 --- [ main] a.r.s.s.DefaultRocketMQListenerContainer : running container: DefaultRocketMQListenerContainer{consumerGroup='my-group', namespace='', nameServer='rocketmq-nameserver1:9876;rocketmq-nameserver2:9876', topic='springboot-rocketmq', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING', tlsEnable=false}
2022-06-22 22:31:58.230 INFO 17644 --- [ main] o.a.r.s.a.ListenerContainerConfiguration : Register the listener to container, listenerBeanName:consumer, containerBeanName:org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_1
2022-06-22 22:31:58.270 INFO 17644 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2022-06-22 22:31:58.286 INFO 17644 --- [ main] c.c.b.BootRocketmqApplication : Started BootRocketmqApplication in 7.152 seconds (JVM running for 9.301)
# 消费的消息内容
接收到消息: hello springboot
接收到消息: hello springboot-rocketmq
2.3 SpringBoot整合Dubbo
下载 dubbo-spring-boot-starter 依赖包。
将dubbo-spring-boot-starter安装到本地仓库,其实下面引入依赖也是一样的。
mvn install -Dmaven.skip.test=true
dubbo调用的流程图
1)准备工作
安装JDK
将Zookeeper上传到服务器
解压Zookeeper到服务器指定目录, 并创建data目录,将conf下的zoo_sample.cfg文件改为zoo.cfg
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz mv zoo_sample.cfg zoo.cfg
建立/usr/local/zookeeper-cluster, 将解压后的Zookeeper复制到以下三个目录
# 复制三份 cp -rf apache-zookeeper-3.6.3-bin /usr/local/zookeeper-cluster/zookeeper-1 cp -rf apache-zookeeper-3.6.3-bin /usr/local/zookeeper-cluster/zookeeper-2 cp -rf apache-zookeeper-3.6.3-bin /usr/local/zookeeper-cluster/zookeeper-3
在zoo.cfg中配置每一个Zookeeper的dataDir, clientPort分别为2181、2182、2183;
修改/usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
clientPort=2181 dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data
修改/usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
clientPort=2182 dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data
修改/usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
clientPort=2183 dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data
2)配置集群
在每个zookeeper的data目录下创建一个myid文件,内容分别是1,2,3。这个文件就是记录每个服务器的ID。
touch zookeeper-1/data/myidmv my touch zookeeper-2/data/myid touch zookeeper-3/data/myid
在每一个zookeeper的zoo.cfg配置客户端访问端口(clientPort)和集群服务器IP列表。集群服务器IP列表如下:
server.1=192.168.65.129:2881:3881 server.2=192.168.65.129:2882:3882 server.3=192.168.65.129:2883:3883
server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口
3) 启动集群
启动集群, 即分别启动每个zk实例:
[root@centos7-01 zookeeper-cluster]# ./zookeeper-1/bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-cluster/zookeeper-1/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [root@centos7-01 zookeeper-cluster]# ./zookeeper-2/bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-cluster/zookeeper-2/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [root@centos7-01 zookeeper-cluster]# ./zookeeper-3/bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-cluster/zookeeper-3/bin/../conf/zoo.cfg Starting zookeeper ... STARTED2.3.2 RPC服务接口
编写服务提供方的接口。
public interface IUserService {
String sayHello(String name);
}
2.3.3 服务提供方
1)添加依赖
4.0.0 org.springframework.boot spring-boot-starter-parent 2.0.1.RELEASE com.crysw.shop springboot-dubbo-provider 0.0.1-SNAPSHOT springboot-dubbo-provider Demo project for Spring Boot 1.8 com.alibaba.spring.boot dubbo-spring-boot-starter 2.0.0 com.crysw.shop springboot-dubbo-interface 0.0.1-SNAPSHOT org.apache.zookeeper zookeeper 3.4.10 org.slf4j slf4j-log4j12 log4j log4j com.101tec zkclient 0.9 org.slf4j slf4j-log4j12 org.springframework.boot spring-boot-starter-web org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin org.projectlombok lombok
2)配置文件内容
# application.properties server.port=8080 spring.application.name=dubbo-demo-provider spring.dubbo.application.id=dubbo-demo-provider spring.dubbo.application.name=dubbo-demo-provider spring.dubbo.registry.address=zookeeper://192.168.65.129:2181;zookeeper://192.168.65.129:2182;zookeeper://192.168.65.129:2183 spring.dubbo.server=true spring.dubbo.protocol.name=dubbo spring.dubbo.protocol.port=20880
3)启动类
package com.crysw.shop.provider;
import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableDubboConfiguration // 开启dubbo配置
public class SpringbootDubboProviderApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootDubboProviderApplication.class, args);
}
}
4)提供接口服务的实现
package com.crysw.shop.provider.service.impl;
import com.alibaba.dubbo.config.annotation.Service;
import com.crysw.shop.infc.service.IUserService;
import org.springframework.stereotype.Component;
// 为了不与dubbo包下的@service冲突,这里使用@Component声明Bean
@Component
// 注意应该为dubbo包下的@Service, 与接口进行绑定
@Service(interfaceClass = IUserService.class)
public class UserServiceImpl implements IUserService {
@Override
public String sayHello(String name) {
return "hello, dubbo, i'm " + name;
}
}
2.3.4 服务消费方
1)添加依赖
4.0.0 org.springframework.boot spring-boot-starter-parent 2.0.1.RELEASE com.crysw.shop springboot-dubbo-consumer 0.0.1-SNAPSHOT springboot-dubbo-consumer Demo project for Spring Boot 1.8 com.alibaba.spring.boot dubbo-spring-boot-starter 2.0.0 com.crysw.shop springboot-dubbo-interface 0.0.1-SNAPSHOT org.apache.zookeeper zookeeper 3.4.10 org.slf4j slf4j-log4j12 log4j log4j com.101tec zkclient 0.9 org.slf4j slf4j-log4j12 org.springframework.boot spring-boot-starter-web org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin org.projectlombok lombok
2)配置文件
server.port=8081 spring.application.name=dubbo-demo-consumer spring.dubbo.application.name=dubbo-demo-consumer spring.dubbo.application.id=dubbo-demo-consumer spring.dubbo.registry.address=zookeeper://192.168.65.129:2181;zookeeper://192.168.65.129:2182;zookeeper://192.168.65.129:2183
3)启动类
package com.crysw.shop.consumer;
import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableDubboConfiguration
public class SpringbootDubboConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootDubboConsumerApplication.class, args);
}
}
4)编写Controller
package com.crysw.shop.consumer.controller;
import com.alibaba.dubbo.config.annotation.Reference;
import com.crysw.shop.infc.service.IUserService;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/user")
public class UserController {
@Reference // dubbo的注解
private IUserService userService;
@RequestMapping("/sayHello")
public String sayHello(String name) {
return userService.sayHello(name);
}
}
2.3.5 测试dubbo接口调用
1)启动服务提供方
查看启动日志,连接zookeeper成功。
启动服务消费方,一样连接上zookeeper,并从zk注册中心获取服务提供方的主机,端口等信息。在浏览器访问http://localhost:8081/user/sayHello?name=crysw
表名:trade_coupon
| Field | Type | Comment |
|---|---|---|
| coupon_id | bigint(50) not null | 优惠券ID |
| coupon_price | decimal(10,2) null | 优惠券金额 |
| user_id | bigint(50) null | 用户ID |
| order_id | bigint(32) null | 订单ID |
| is_used | int(1) null | 是否使用 0-未使用 1-已使用 |
| used_time | timestamp null | 使用时间 |
表名:trade_goods
| Field | Type | Comment |
|---|---|---|
| goods_id | bigint(50) not null | 主键 |
| goods_name | varchar(255) null | 商品名称 |
| goods_number | int(11) null | 商品库存 |
| goods_price | decimal(10,2) null | 商品价格 |
| goods_desc | varchar(255) null | 商品描述 |
| add_time | timestamp null | 添加时间 |
表名:trade_order
| Field | Type | Comment |
|---|---|---|
| order_id | bigint(50) not null | 订单ID |
| user_id | bigint(50) null | 用户ID |
| order_status | int(1) null | 订单状态 0-未确认 1-已确认 2-已取消 3-无效 4-退款 |
| pay_status | int(1) null | 支付状态 0-未支付 1-支付中 2-已支付 |
| shipping_status | int(1) null | 发货状态 0-未发货 1-已发货 2-已发货 |
| address | varchar(255) null | 收获地址 |
| consignee | varchar(255) null | 收货人 |
| goods_id | bigint(50) null | 商品ID |
| goods_number | int(11) null | 商品数量 |
| goods_price | decimal(10,2) null | 商品价格 |
| goods_amount | decimal(10,2) null | 商品总价 |
| shipping_fee | decimal(10,2) null | 运费 |
| order_amount | decimal(10,2) null | 订单金额 (商品数量*商品价格) |
| coupon_id | bigint(50) null | 优惠券ID |
| coupon_paid | decimal(10,2) null | 优惠券 |
| money_paid | decimal(10,2) null | 已支付金额 |
| pay_amount | decimal(10,2) null | 支付金额 |
| add_time | timestamp null | 创建时间 |
| confirm_time | timestamp null | 订单确认时间 |
| pay_time | timestamp Null | 支付时间 |
表名:trade_goods_number_log
| Field | Type | Comment |
|---|---|---|
| goods_id | int(50) not null | 商品ID |
| oder_id | varchar(50) not null | 订单ID |
| goods_number | int(11) null | 商品数量 |
| log_time | datetime null | 记录时间 |
表名:trade_user
| Field | Type | Comment |
|---|---|---|
| user_id | bigint(50) not null | 用户ID |
| user_name | varchar(255) null | 用户姓名 |
| user_password | varchar(255) null | 用户密码 |
| user_mobile | varchar(255) null | 手机号 |
| user_score | int(11) null | 积分 |
| user_reg_time | timestamp null | 注册时间 |
| user_money | decimal(10,2) null | 用户余额 |
表名:trade_user_money_log
| Field | Type | Comment |
|---|---|---|
| user_id | bigint(50) not null | 用户ID |
| order_id | bigint(50) not null | 订单ID |
| money_log_type | int(1) not null | 日志类型 1-订单付款 2-订单退款 |
| use_money | decimal(10,2) null | 操作金额 |
| create_time | timestamp null | 日志时间 |
表名:trade_pay
| Field | Type | Comment |
|---|---|---|
| pay_id | bigint(50) not null | 支付编号 |
| order_id | bigint(50) null | 订单编号 |
| pay_amount | decimal(10,2) null | 支付金额 |
| is_paid | int(1) null | 是否已支付 1-否 2-是 |
表名:trade_mq_producer_temp
| Field | Type | Comment |
|---|---|---|
| id | varchar(100) not null | 主键 |
| group_name | varchar(100) null | 生产者组名 |
| msg_topic | varchar(100) null | 消息主题 |
| msg_tag | varchar(100) null | Tag |
| msg_key | varchar(100) null | Key |
| msg_body | varchar(500) null | 消息内容 |
| msg_status | int(1) null | 0-未处理 1-已处理 |
| create_time | timestamp not null | 记录时间 |
表名:trade_mq_consumer_log
| Field | Type | Comment |
|---|---|---|
| msg_id | varchar(100) null | 消息ID |
| group_name | varchar(100) not null | 消费者组名 (primary key) |
| msg_tag | varchar(100) not null | Tag (primary key) |
| msg_key | varchar(100) not null | Key (primary key) |
| msg_body | varchar(500) null | 消息体 |
| consumer_status | int(1) null | 0-正在处理;1-处理成功;2-处理失败 |
| consumer_times | int(1) null | 消费次数 |
| consumer_time | timestamp null | 消费时间 |
| remark | varchar(500) null |
; ; ; ; ; ; CREATE DATABASE `trade` ; USE `trade`; DROP TABLE IF EXISTS `trade_coupon`; CREATE TABLE `trade_coupon` ( `coupon_id` BIGINT(50) NOT NULL COMMENT '优惠券ID', `coupon_price` DECIMAL(10,2) DEFAULT NULL COMMENT '优惠券金额', `user_id` BIGINT(50) DEFAULT NULL COMMENT '用户ID', `order_id` BIGINT(32) DEFAULT NULL COMMENT '订单ID', `is_used` INT(1) DEFAULT NULL COMMENT '是否使用 0未使用 1已使用', `used_time` TIMESTAMP NULL DEFAULT NULL COMMENT '使用时间', PRIMARY KEY (`coupon_id`), KEY `FK_trade_coupon` (`user_id`), KEY `FK_trade_coupon2` (`order_id`) ) ENGINE=INNODB DEFAULT CHARSET=utf8; DROP TABLE IF EXISTS `trade_goods`; CREATE TABLE `trade_goods` ( `goods_id` BIGINT(50) NOT NULL AUTO_INCREMENT, `goods_name` VARCHAr(255) DEFAULT NULL COMMENT '商品名称', `goods_number` INT(11) DEFAULT NULL COMMENT '商品库存', `goods_price` DECIMAL(10,2) DEFAULT NULL COMMENT '商品价格', `goods_desc` VARCHAr(255) DEFAULT NULL COMMENT '商品描述', `add_time` TIMESTAMP NULL DEFAULT NULL COMMENT '添加时间', PRIMARY KEY (`goods_id`) ) ENGINE=INNODB AUTO_INCREMENT=345959443973935105 DEFAULT CHARSET=utf8; INSERT INTO `trade_goods`(`goods_id`,`goods_name`,`goods_number`,`goods_price`,`goods_desc`,`add_time`) VALUES (345959443973935104,'华为P30',999,'5000.00','夜间拍照更美','2019-07-09 20:38:00'); DROP TABLE IF EXISTS `trade_goods_number_log`; CREATE TABLE `trade_goods_number_log` ( `goods_id` BIGINT(50) NOT NULL COMMENT '商品ID', `order_id` BIGINT(50) NOT NULL COMMENT '订单ID', `goods_number` INT(11) DEFAULT NULL COMMENT '库存数量', `log_time` TIMESTAMP NULL DEFAULT NULL, PRIMARY KEY (`goods_id`,`order_id`, `goods_number`), -- 如果不设置三个字段的联合主键,创建订单和取消订单登记的日志会存在主键冲突. 创建订单 goods_numer为负数表示扣减库存, 取消订单goods_number为正数表示回退库存. KEY `FK_trade_goods_number_log2` (`order_id`) ) ENGINE=INNODB DEFAULT CHARSET=utf8; DROP TABLE IF EXISTS `trade_mq_consumer_log`; CREATE TABLE `trade_mq_consumer_log` ( `msg_id` VARCHAr(50) DEFAULT NULL, `group_name` VARCHAr(100) NOT NULL, `msg_tag` VARCHAr(100) NOT NULL, `msg_key` VARCHAr(100) NOT NULL, `msg_body` VARCHAr(500) DEFAULT NULL, `consumer_status` INT(1) DEFAULT NULL COMMENT '0:正在处理;1:处理成功;2:处理失败', `consumer_times` INT(1) DEFAULT NULL, `consumer_timestamp` TIMESTAMP NULL DEFAULT NULL, `remark` VARCHAr(500) DEFAULT NULL, PRIMARY KEY (`group_name`,`msg_tag`,`msg_key`) ) ENGINE=INNODB DEFAULT CHARSET=utf8; DROP TABLE IF EXISTS `trade_mq_producer_temp`; CREATE TABLE `trade_mq_producer_temp` ( `id` VARCHAr(100) NOT NULL, `group_name` VARCHAr(100) DEFAULT NULL, `msg_topic` VARCHAr(100) DEFAULT NULL, `msg_tag` VARCHAr(100) DEFAULT NULL, `msg_key` VARCHAr(100) DEFAULT NULL, `msg_body` VARCHAr(500) DEFAULT NULL, `msg_status` INT(1) DEFAULT NULL COMMENT '0:未处理;1:已经处理', `create_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=INNODB DEFAULT CHARSET=utf8; DROP TABLE IF EXISTS `trade_order`; CREATE TABLE `trade_order` ( `order_id` BIGINT(50) NOT NULL COMMENT '订单ID', `user_id` BIGINT(50) DEFAULT NULL COMMENT '用户ID', `order_status` INT(1) DEFAULT NULL COMMENT '订单状态 0未确认 1已确认 2已取消 3无效 4退款', `pay_status` INT(1) DEFAULT NULL COMMENT '支付状态 0未支付 1支付中 2已支付', `shipping_status` INT(1) DEFAULT NULL COMMENT '发货状态 0未发货 1已发货 2已收货', `address` VARCHAr(255) DEFAULT NULL COMMENT '收货地址', `consignee` VARCHAr(255) DEFAULT NULL COMMENT '收货人', `goods_id` BIGINT(50) DEFAULT NULL COMMENT '商品ID', `goods_number` INT(11) DEFAULT NULL COMMENT '商品数量', `goods_price` DECIMAL(10,2) DEFAULT NULL COMMENT '商品价格', `goods_amount` DECIMAL(10,0) DEFAULT NULL COMMENT '商品总价', `shipping_fee` DECIMAL(10,2) DEFAULT NULL COMMENT '运费', `order_amount` DECIMAL(10,2) DEFAULT NULL COMMENT '订单价格', `coupon_id` BIGINT(50) DEFAULT NULL COMMENT '优惠券ID', `coupon_paid` DECIMAL(10,2) DEFAULT NULL COMMENT '优惠券', `money_paid` DECIMAL(10,2) DEFAULT NULL COMMENT '已付金额', `pay_amount` DECIMAL(10,2) DEFAULT NULL COMMENT '支付金额', `add_time` TIMESTAMP NULL DEFAULT NULL COMMENT '创建时间', `confirm_time` TIMESTAMP NULL DEFAULT NULL COMMENT '订单确认时间', `pay_time` TIMESTAMP NULL DEFAULT NULL COMMENT '支付时间', PRIMARY KEY (`order_id`), KEY `FK_trade_order` (`user_id`), KEY `FK_trade_order2` (`goods_id`), KEY `FK_trade_order3` (`coupon_id`) ) ENGINE=INNODB DEFAULT CHARSET=utf8; DROP TABLE IF EXISTS `trade_pay`; CREATE TABLE `trade_pay` ( `pay_id` BIGINT(50) NOT NULL COMMENT '支付编号', `order_id` BIGINT(50) DEFAULT NULL COMMENT '订单编号', `pay_amount` DECIMAL(10,2) DEFAULT NULL COMMENT '支付金额', `is_paid` INT(1) DEFAULT NULL COMMENT '是否已支付 1否 2是', PRIMARY KEY (`pay_id`), KEY `FK_trade_pay` (`order_id`) ) ENGINE=INNODB DEFAULT CHARSET=utf8; DROP TABLE IF EXISTS `trade_user`; CREATE TABLE `trade_user` ( `user_id` BIGINT(50) NOT NULL AUTO_INCREMENT COMMENT '用户ID', `user_name` VARCHAr(255) DEFAULT NULL COMMENT '用户姓名', `user_password` VARCHAr(255) DEFAULT NULL COMMENT '用户密码', `user_mobile` VARCHAr(255) DEFAULT NULL COMMENT '手机号', `user_score` INT(11) DEFAULT NULL COMMENT '积分', `user_reg_time` TIMESTAMP NULL DEFAULT NULL COMMENT '注册时间', `user_money` DECIMAL(10,0) DEFAULT NULL COMMENT '用户余额', PRIMARY KEY (`user_id`) ) ENGINE=INNODB AUTO_INCREMENT=345963634385633281 DEFAULT CHARSET=utf8; INSERT INTO `trade_user`(`user_id`,`user_name`,`user_password`,`user_mobile`,`user_score`,`user_reg_time`,`user_money`) VALUES (345963634385633280,'刘备','123L','18888888888L',100,'2019-07-09 13:37:03','900'); DROP TABLE IF EXISTS `trade_user_money_log`; CREATE TABLE `trade_user_money_log` ( `user_id` BIGINT(50) NOT NULL COMMENT '用户ID', `order_id` BIGINT(50) NOT NULL COMMENT '订单ID', `money_log_type` INT(1) NOT NULL COMMENT '日志类型 1订单付款 2 订单退款', `use_money` DECIMAL(10,2) DEFAULT NULL, `create_time` TIMESTAMP NULL DEFAULT NULL COMMENT '日志时间', PRIMARY KEY (`user_id`,`order_id`,`money_log_type`), KEY `FK_trade_user_money_log2` (`order_id`) ) ENGINE=INNODB DEFAULT CHARSET=utf8;3.2 项目初始化
shop系统基于maven进行项目管理。
3.2.1 工程浏览共12个系统
使用Mybatis逆向工程针对数据表生成CURD持久层代码
2)代码导入ID生成器
IDWorker:Twitter雪花算法
异常处理类
CustomerException:自定义异常类
CastException:异常抛出类
常量类
ShopCode:系统状态类
响应实体类
Result:封装响应状态和响应信息
shop-api中添加订单接口 IOrderService
package com.crysw.api;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradeOrder;
public interface IOrderService {
Result confirmOrder(TradeOrder order);
}
shop-api中添加商品接口 IGoodsService
package com.crysw.api;
import com.crysw.shop.pojo.TradeGoods;
public interface IGoodsService {
TradeGoods findOne(Long goodsId);
Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog);
}
shop-api中添加用户接口 IUserService
package com.crysw.api;
import com.crysw.shop.pojo.TradeUser;
public interface IUserService {
TradeUser findOne(Long userId);
}
shop-api中添加用户接口 ICouponService
package com.crysw.api;
import com.crysw.shop.pojo.TradeCoupon;
public interface ICouponService {
TradeCoupon findOne(Long couponId);
}
4.1.2 业务类实现
在模块shop-order-service中添加IOrderService接口的实现类, 进行订单的创建。
package com.crysw.shop.service.impl;
import com.crysw.api.IOrderService;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradeOrder;
@Component
// dubbo服务接口
@Service(interfaceClass = IOrderService.class)
@Slf4j
public class OrderServiceImpl implements IOrderService {
@Reference
private IGoodsService goodsService;
@Reference
private IUserService userService;
@Override
public Result confirmOrder(TradeOrder order) {
//1.校验订单
checkOrder(order);
//2.生成预订单
savePreOrder(order);
try {
//3.扣减库存
reduceGoodsNum(order);
//4.扣减优惠券
updateCouponStatus(order);
//5.扣减用户余额
reduceMoneyPaid(order);
//6.确认订单
updateOrderStatus(order);
//7.返回成功状态
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
} catch (Exception e) {
//1.确认订单失败,发送消息
//2.返回失败状态
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}
}
4.1.3 校验订单
OrderServiceImpl#checkOrder
private void checkOrder(TradeOrder order) {
//1.校验订单是否存在
if (order == null) {
CastException.cast(ShopCode.SHOP_ORDER_INVALID);
}
//2.校验订单中的商品是否存在
TradeGoods goods = goodsService.findOne(order.getGoodsId());
if (goods == null) {
CastException.cast(ShopCode.SHOP_GOODS_NO_EXIST);
}
//3.校验下单用户是否存在
TradeUser user = userService.findOne(order.getUserId());
if (user == null) {
CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
}
//4.校验商品单价是否合法
if (order.getGoodsPrice().compareTo(goods.getGoodsPrice()) != 0) {
CastException.cast(ShopCode.SHOP_GOODS_PRICE_INVALID);
}
//5.校验订单商品数量是否合法
if (order.getGoodsNumber() >= goods.getGoodsNumber()) {
CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
}
log.info("校验订单通过");
}
4.1.4 生成预订单
OrderServiceImpl#savePreOrder
private long savePreOrder(TradeOrder order) {
order.setOrderStatus(ShopCode.SHOP_ORDER_NO_CONFIRM.getCode());
long orderId = idWorker.nextId();
order.setOrderId(orderId);
//3.核算订单运费**/
// 计算运费
BigDecimal shippingFee = calculateShippingFee(order.getOrderAmount());
// 校验运费是否正确
if (order.getShippingFee().compareTo(shippingFee) != 0) {
CastException.cast(ShopCode.SHOP_ORDER_SHIPPINGFEE_INVALID);
}
// 计算订单总金额=订单的商品价格*订单的商品数量
BigDecimal orderAmounts = order.getGoodsPrice().multiply(new BigDecimal(order.getGoodsNumber()));
// 加上运费
orderAmounts.add(shippingFee);
if (order.getOrderAmount().compareTo(orderAmounts) != 0) {
CastException.cast(ShopCode.SHOP_ORDERAMOUNT_INVALID);
}
BigDecimal moneyPaid = order.getMoneyPaid();
if (moneyPaid != null) {
//5.1 订单中余额是否合法
int r = moneyPaid.compareTo(BigDecimal.ZERO);
// 余额小于0
if (r == -1) {
CastException.cast(ShopCode.SHOP_MONEY_PAID_INVALID);
}
//余额大于0
if (r == 1) {
// 查询用户信息
TradeUser user = userService.findOne(order.getUserId());
if (user == null) {
CastException.cast(ShopCode.SHOP_USER_NO_EXIST);
}
//比较余额是否大于用户账户余额
if (user.getUserMoney().compareTo(moneyPaid.longValue()) == -1) {
CastException.cast(ShopCode.SHOP_MONEY_PAID_INVALID);
}
order.setMoneyPaid(moneyPaid);
}
} else {
order.setMoneyPaid(BigDecimal.ZERO);
}
Long couponId = order.getCouponId();
if (couponId != null) {
// 查询优惠券信息
TradeCoupon coupon = couponService.findOne(couponId);
//6.1 判断优惠券是否存在
if (coupon == null) {
CastException.cast(ShopCode.SHOP_COUPON_NO_EXIST);
}
//6.2 判断优惠券是否已经被使用
if (Objects.equals(ShopCode.SHOP_COUPON_ISUSED.getCode().toString(), coupon.getIsUsed().toString())) {
CastException.cast(ShopCode.SHOP_COUPON_INVALIED);
}
order.setCouponPaid(coupon.getCouponPrice());
} else {
// 优惠券不存在
order.setCouponPaid(BigDecimal.ZERO);
}
BigDecimal payAmount = order.getOrderAmount().subtract(order.getMoneyPaid()).subtract(order.getCouponPaid());
order.setPayAmount(payAmount);
order.setAddTime(new Date());
orderMapper.insert(order);
return orderId;
}
private BigDecimal calculateShippingFee(BigDecimal orderAmount) {
// 如果订单价格大于100, 免运费
if (orderAmount.compareTo(new BigDecimal(100)) == 1) {
return BigDecimal.ZERO;
} else {
// 否则, 需要收取10元运费
return new BigDecimal(10);
}
}
4.1.5 扣减库存
通过dubbo调用商品服务完成扣减库存. OrderServiceImpl#reduceGoodsNum
private void reduceGoodsNum(TradeOrder order) {
TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
goodsNumberLog.setOrderId(order.getOrderId());
goodsNumberLog.setGoodsId(order.getGoodsId());
goodsNumberLog.setGoodsNumber(order.getGoodsNumber());
Result result = goodsService.reduceGoodsNum(goodsNumberLog);
if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
CastException.cast(ShopCode.SHOP_REDUCE_GOODS_NUM_FAIL);
}
log.info("订单: " + order.getOrderId() + "扣减库存成功");
}
在商品服务接口中新增扣减库存的方法
public interface IGoodsService {
TradeGoods findOne(Long goodsId);
Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog);
}
商品服务GoodsService扣减库存的实现. GoodsServiceImpl#reduceGoodsNum
@Override
public Result reduceGoodsNum(TradeGoodsNumberLog goodsNumberLog) {
if (goodsNumberLog == null
|| goodsNumberLog.getGoodsNumber() == null
|| goodsNumberLog.getOrderId() == null
|| goodsNumberLog.getGoodsNumber().intValue() <= 0) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsNumberLog.getGoodsId());
if (goods.getGoodsNumber() < goodsNumberLog.getGoodsNumber()) {
// 库存不足
CastException.cast(ShopCode.SHOP_GOODS_NUM_NOT_ENOUGH);
}
//减库存
goods.setGoodsNumber(goods.getGoodsNumber() - goodsNumberLog.getGoodsNumber());
goodsMapper.updateByPrimaryKey(goods);
//记录库存操作日志
goodsNumberLog.setGoodsNumber(-goodsNumberLog.getGoodsNumber());
goodsNumberLog.setLogTime(new Date());
goodsNumberLogMapper.insert(goodsNumberLog);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}
4.1.6 扣减优惠券
通过dubbo完成扣减优惠券. OrderServiceImpl#updateCouponStatus
private void updateCouponStatus(TradeOrder order) {
if (order.getCouponId() != null) {
TradeCoupon coupon = couponService.findOne(order.getCouponId());
coupon.setOrderId(order.getOrderId());
coupon.setIsUsed(ShopCode.SHOP_COUPON_ISUSED.getCode());
coupon.setUsedTime(new Date());
//更新优惠券状态
Result result = couponService.updateCouponStatus(coupon);
if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
CastException.cast(ShopCode.SHOP_COUPON_USE_FAIL);
}
log.info("订单:" + order.getOrderId() + ",使用优惠券");
}
}
优惠券服务接口新增更新优惠券状态接口. ICouponService#updateCouponStatus
public interface ICouponService {
TradeCoupon findOne(Long couponId);
Result updateCouponStatus(TradeCoupon coupon);
}
优惠券服务CouponService更改优惠券状态 CouponServiceImpl#updateCouponStatus
@Override
public Result updateCouponStatus(TradeCoupon coupon) {
if (coupon == null || coupon.getCouponId() == null) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
tradeCouponMapper.updateByPrimaryKey(coupon);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}
4.1.7 扣减用户余额
扣减用户余额的流程图
通过用户服务完成扣减余额 OrderServiceImpl#reduceMoneyPaid
private void reduceMoneyPaid(TradeOrder order) {
if (order.getMoneyPaid() != null && order.getMoneyPaid().compareTo(BigDecimal.ZERO) == 1) {
TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
userMoneyLog.setOrderId(order.getOrderId());
userMoneyLog.setUserId(order.getUserId());
userMoneyLog.setUseMoney(order.getMoneyPaid());
userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_PAID.getCode());
Result result = userService.updateMoneyPaid(userMoneyLog);
if (result.getSuccess().equals(ShopCode.SHOP_FAIL.getSuccess())) {
CastException.cast(ShopCode.SHOP_USER_MONEY_REDUCE_FAIL);
}
log.info("订单:" + order.getOrderId() + ",扣减余额成功");
}
}
用户服务接口新增更新余额的接口方法 IUserService#updateMoneyPaid
public interface IUserService {
TradeUser findOne(Long userId);
Result updateMoneyPaid(TradeUserMoneyLog userMoneyLog);
}
用户服务UserService,更新余额的实现 UserServiceImpl#updateMoneyPaid
@Override
public Result updateMoneyPaid(TradeUserMoneyLog userMoneyLog) {
if (userMoneyLog == null ||
userMoneyLog.getUserId() == null ||
userMoneyLog.getOrderId() == null ||
userMoneyLog.getUseMoney().compareTo(BigDecimal.ZERO) <= 0) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
TradeUserMoneyLogExample userMoneyLogExample = new TradeUserMoneyLogExample();
TradeUserMoneyLogExample.Criteria criteria = userMoneyLogExample.createCriteria();
criteria.andOrderIdEqualTo(userMoneyLog.getOrderId());
criteria.andUserIdEqualTo(userMoneyLog.getUserId());
int r = userMoneyLogMapper.countByExample(userMoneyLogExample);
TradeUser tradeUser = tradeUserMapper.selectByPrimaryKey(userMoneyLog.getUserId());
if (userMoneyLog.getMoneyLogType().intValue() == ShopCode.SHOP_USER_MONEY_PAID.getCode().intValue()) {
if (r > 0) {
// 已付款
CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
}
// 扣减余额操作
tradeUser.setUserMoney(new BigDecimal(tradeUser.getUserMoney()).subtract(userMoneyLog.getUseMoney()).longValue());
tradeUserMapper.updateByPrimaryKey(tradeUser);
}
if (userMoneyLog.getMoneyLogType().intValue() == ShopCode.SHOP_USER_MONEY_REFUND.getCode().intValue()) {
if (r < 0) {
// 没有付款记录, 不能退款
CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY);
}
// 防止多次退款
TradeUserMoneyLogExample userMoneyLogExample2 = new TradeUserMoneyLogExample();
userMoneyLogExample2.createCriteria()
.andOrderIdEqualTo(userMoneyLog.getOrderId())
.andUserIdEqualTo(userMoneyLog.getUserId())
.andMoneyLogTypeEqualTo(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
int r2 = userMoneyLogMapper.countByExample(userMoneyLogExample2);
if (r2 > 0) {
// 已经退款了
CastException.cast(ShopCode.SHOP_USER_MONEY_REFUND_ALREADY);
}
// 没有退款, 进行退款操作
tradeUser.setUserMoney(new BigDecimal(tradeUser.getUserMoney()).add(userMoneyLog.getUseMoney()).longValue());
tradeUserMapper.updateByPrimaryKey(tradeUser);
}
userMoneyLog.setCreateTime(new Date());
userMoneyLogMapper.insert(userMoneyLog);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}
4.1.8 确认订单
OrderServiceImpl#updateOrderStatus
private void updateOrderStatus(TradeOrder order) {
order.setOrderStatus(ShopCode.SHOP_ORDER_CONFIRM.getCode());
order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
order.setConfirmTime(new Date());
int r = orderMapper.updateByPrimaryKey(order);
if (r <= 0) {
// 确认订单失败
CastException.cast(ShopCode.SHOP_ORDER_CONFIRM_FAIL);
}
log.info("订单:[" + order.getOrderId() + "]状态修改(确认)成功");
}
4.1.9 测试创建订单
编写测试api: OrderServiceTest#confirmOrder
package com.crysw.test;
import com.crysw.api.IOrderService;
import com.crysw.shop.OrderApplication;
import com.crysw.shop.pojo.TradeOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.math.BigDecimal;
@SpringBootTest(classes = {OrderApplication.class})
@RunWith(SpringRunner.class)
public class OrderServiceTest {
@Autowired
private IOrderService orderService;
@Test
public void confirmOrder() throws IOException {
Long coupouId = 345988230098857984L;
Long goodsId = 345959443973935104L;
Long userId = 345963634385633280L;
// 创建订单
TradeOrder order = new TradeOrder();
order.setGoodsId(goodsId);
order.setUserId(userId);
order.setCouponId(coupouId);
order.setAddress("北京");
order.setGoodsNumber(1);
// 商品价格
order.setGoodsPrice(new BigDecimal(1000));
// 运费
order.setShippingFee(BigDecimal.ZERO);
// 订单金额 = 商品价格*数量 + 运费
order.setOrderAmount(new BigDecimal(1000));
// 已支付金额
order.setMoneyPaid(new BigDecimal(100));
orderService.confirmOrder(order);
System.in.read();
}
}
测试日志:
# 校验订单 2022-07-25 13:07:17.821 INFO 24040 --- [ main] c.c.shop.service.impl.OrderServiceImpl : 校验订单通过 2022-07-25 13:07:18.143 INFO 24040 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting... 2022-07-25 13:07:18.147 WARN 24040 --- [ main] com.zaxxer.hikari.util.DriverDataSource : Registered driver with driverClassName=com.mysql.jdbc.Driver was not found, trying direct instantiation. 2022-07-25 13:07:18.361 INFO 24040 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed. # 扣减库存 2022-07-25 13:07:18.567 INFO 24040 --- [ main] c.c.shop.service.impl.OrderServiceImpl : 订单: 748931450161729536扣减库存成功 # 使用优惠券 2022-07-25 13:07:18.612 INFO 24040 --- [ main] c.c.shop.service.impl.OrderServiceImpl : 订单:748931450161729536,使用优惠券 # 扣减余额 2022-07-25 13:07:18.942 INFO 24040 --- [ main] c.c.shop.service.impl.OrderServiceImpl : 订单:748931450161729536,扣减余额成功 # 修改订单状态 2022-07-25 13:07:18.947 INFO 24040 --- [ main] c.c.shop.service.impl.OrderServiceImpl : 订单:[748931450161729536]状态修改(确认)成功
查看库表记录:
trade_order订单表生成了一条订单, 还没有支付.
|
748931450161729536 345963634385633280 1 0 (NULL) 北京(NULL) 345959443973935104 1 1000.00 (NULL) 0.00 1000.00 345988230098857984 20.00 100.00 880.00 2022-07-25 13:07:18 2022-07-25 13:07:19 (NULL)
trade_goods_number_log 订单商品日志表, 新增一条订单日志记录.
|
345959443973935104 748931450161729536 -1 2022-07-25 13:07:19
trade_goods 商品表, 库存减少.
|
345959443973935104 JavaSE课程 991 1000.00 传智播客出品Java视频课程 2019-07-09 20:38:00 |
345959443973935105 华为P30 999 5000.00 夜间拍照更美 2019-07-09 20:38:00
trade_coupon 优惠券表的优惠券已使用.
|
345988230098857984 20.00 345963634385633280 748931450161729536 1 2022-07-25 13:07:19
trade_user用户表的用户余额扣减了100, 剩余900.
|
345963634385633280 刘备 123L 18888888888L 100 2019-07-09 13:37:03 900
trade_user_money_log 用户余额日志表, 新增一条扣减用户余额的日志记录.
4.2 失败补偿机制|
345963634385633280 748931450161729536 1 100.00 2022-07-25 13:07:19
使用MQ异步解耦的方式推送下单失败的消息, MQ集群搭建参考博客RocketMQ专题01
4.2.1 消息发送方# RocketMQ rocketmq.name-server=192.168.65.129:9876;192.168.65.130:9876 # 创建订单失败的生者者组 rocketmq.producer.group=orderProducerGroup # 创建订单失败的消费者组,topic,tag mq.order.consumer.group.name=order_orderTopic_cancel_group mq.order.topic=orderTopic mq.order.tag.cancel=order_cancel
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Value("${mq.order.topic}")
private String topic;
@Value("${mq.order.tag.cancel}")
private String cancelTag;
@Override
public Result confirmOrder(TradeOrder order) {
//1.校验订单
checkOrder(order);
//2.生成预订单
savePreOrder(order);
try {
//3.扣减库存
reduceGoodsNum(order);
//4.扣减优惠券
updateCouponStatus(order);
//5.扣减用户余额
reduceMoneyPaid(order);
// 模拟异常
CastException.cast(ShopCode.SHOP_FAIL);
//6.确认订单
updateOrderStatus(order);
//7.返回成功状态
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
} catch (Exception e) {
//1.确认订单失败,发送消息
MQEntity mqEntity = new MQEntity();
mqEntity.setOrderId(order.getOrderId());
mqEntity.setUserId(order.getUserId());
mqEntity.setGoodsId(order.getGoodsId());
mqEntity.setCouponId(order.getCouponId());
mqEntity.setGoodsNum(order.getGoodsNumber());
mqEntity.setUserMoney(order.getMoneyPaid());
try {
sendCancelOrder(topic, tag, order.getOrderId().toString(), JSON.toJSONString(mqEntity));
} catch (Exception ex) {
ex.printStackTrace();
}
//2.返回失败状态
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}
private void sendCancelOrder(String topic, String tag, String keys, String body) throws Exception {
Message message = new Message(topic, tag, keys, body.getBytes());
rocketMQTemplate.getProducer().send(message);
}
将优惠券345988230098857984的状态修改为0-未使用, 在上面的下单流程中的扣减余额后面加入模拟异常抛出的代码 ,重新执行单元测试案例.
//5.扣减用户余额 reduceMoneyPaid(order); // 模拟异常 CastException.cast(ShopCode.SHOP_FAIL); //6.确认订单 updateOrderStatus(order);
创建订单的测试日志
# 校验订单
2022-07-25 16:35:16.767 INFO 23960 --- [ main] c.c.shop.service.impl.OrderServiceImpl : 校验订单通过
2022-07-25 16:35:16.956 INFO 23960 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2022-07-25 16:35:16.969 WARN 23960 --- [ main] com.zaxxer.hikari.util.DriverDataSource : Registered driver with driverClassName=com.mysql.jdbc.Driver was not found, trying direct instantiation.
2022-07-25 16:35:17.249 INFO 23960 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
# 减库存
2022-07-25 16:35:17.340 INFO 23960 --- [ main] c.c.shop.service.impl.OrderServiceImpl : 订单: 748983790659047424扣减库存成功
# 使用优惠券
2022-07-25 16:35:17.369 INFO 23960 --- [ main] c.c.shop.service.impl.OrderServiceImpl : 订单:748983790659047424,使用优惠券
# 扣减余额
2022-07-25 16:35:17.423 INFO 23960 --- [ main] c.c.shop.service.impl.OrderServiceImpl : 订单:748983790659047424,扣减余额成功
# 下单失败,抛出异常
2022-07-25 16:35:17.443 ERROR 23960 --- [ main] com.crysw.exception.CastException : ShopCode{success=false, code=0, message='错误'}
程序捕获到异常后, 下单失败会推送mq消息.
订单表, 订单商品日志表, 优惠券表, 用户表, 用户余额日志表都已经插入记录或更新记录成功, 创建订单却是失败的, 正常来说是需要回滚之前的记录.
这里可以通过MQ消费者消费上面推送的订单失败的消息进行数据回滚, 与主流程解耦, 提高处理性能.
配置RocketMQ属性值
# RocketMQ rocketmq.name-server=192.168.65.129:9876;192.168.65.130:9876 mq.order.consumer.group.name=order_orderTopic_cancel_group mq.order.topic=orderTopic
创建监听类,消费消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}",
messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener{
@Override
public void onMessage(MessageExt messageExt) {
// 消息处理
}
}
1) 回退库存
创建订单失败后, 通过MQ推送下单失败的消息, 商品服务[shop-goods-service]订阅下单失败的主题, 监听到下单失败的消息时, 进行商品库存的回退及消息日志记录.
流程分析
消息消费者的回退库存操作
package com.crysw.shop.mq;
import com.alibaba.fastjson.JSON;
import com.crysw.shop.entity.MQEntity;
import com.crysw.shop.mapper.TradeGoodsMapper;
import com.crysw.shop.mapper.TradeGoodsNumberLogMapper;
import com.crysw.shop.mapper.TradeMqConsumerLogMapper;
import com.crysw.shop.pojo.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import static com.crysw.constant.ShopCode.*;
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", messageModel = MessageModel.BROADCASTING)
public class CancelGoodsConsumer implements RocketMQListener {
@Value("${mq.order.consumer.group.name}")
private String groupName;
@Autowired
private TradeMqConsumerLogMapper mqConsumerLogMapper;
@Autowired
private TradeGoodsMapper goodsMapper;
@Autowired
private TradeGoodsNumberLogMapper goodsNumberLogMapper;
@Override
public void onMessage(MessageExt messageExt) {
String msgId = null;
String tags = null;
String keys = null;
String body = null;
try {
// 1. 解析消息内容
msgId = messageExt.getMsgId();
tags = messageExt.getTags();
keys = messageExt.getKeys();
body = new String(messageExt.getBody(), "utf-8");
log.info("接受消息成功");
// 2. 查询消息消费记录
TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();
primaryKey.setMsgKey(keys);
primaryKey.setMsgTag(tags);
primaryKey.setGroupName(groupName);
TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);
if (mqConsumerLog != null) {
// 3. 判断, 如果消费过, 获取消息处理状态 0:正在处理;1:处理成功;2:处理失败
Integer status = mqConsumerLog.getConsumerStatus();
// 处理过...返回
if (SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue() == status.intValue()) {
log.info("消息:" + msgId + ",已经处理过");
return;
}
//正在处理...返回
if (SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue() == status.intValue()) {
log.info("消息:" + msgId + ",正在处理");
return;
}
//处理失败, 判断消息处理次数是否超过重试次数 3次
if (SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue() == status.intValue()) {
// 获取消息已处理的次数
Integer times = mqConsumerLog.getConsumerTimes();
if (times.intValue() > 3) {
log.info("消息:" + msgId + ",消息处理超过3次,不能再进行处理了");
return;
}
// 将消息状态修改为正在处理中
mqConsumerLog.setConsumerStatus(SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
//使用数据库乐观锁更新
TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();
TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();
criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag())
.andMsgKeyEqualTo(mqConsumerLog.getMsgKey())
.andGroupNameEqualTo(mqConsumerLog.getGroupName())
.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());
int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);
if (r <= 0) {
//未修改成功,其他线程并发修改
log.info("并发修改,稍后处理");
}
}
} else {
// 4. 判断, 如果没有消费过
mqConsumerLog = new TradeMqConsumerLog();
mqConsumerLog.setMsgId(msgId);
mqConsumerLog.setMsgTag(tags);
mqConsumerLog.setMsgKey(keys);
mqConsumerLog.setMsgBody(body);
mqConsumerLog.setGroupName(groupName);
mqConsumerLog.setConsumerStatus(SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());
mqConsumerLog.setConsumerTimes(0);
//将消息处理信息添加到数据库
mqConsumerLogMapper.insert(mqConsumerLog);
}
// 5. 回退库存
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
Long goodsId = mqEntity.getGoodsId();
TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);
goods.setGoodsNumber(goods.getGoodsNumber() + mqEntity.getGoodsNum());
goodsMapper.updateByPrimaryKey(goods);
// 记录库存操作日志
TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();
goodsNumberLog.setOrderId(mqEntity.getOrderId());
goodsNumberLog.setGoodsId(goodsId);
goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());
goodsNumberLog.setLogTime(new Date());
goodsNumberLogMapper.insert(goodsNumberLog);
// 6. 将消息的处理状态改为处理成功
mqConsumerLog.setConsumerStatus(SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());
mqConsumerLog.setConsumerTimestamp(new Date());
mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);
log.info("回退库存成功");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
TradeMqConsumerLogKey primarykey = new TradeMqConsumerLogKey();
primarykey.setMsgTag(tags);
primarykey.setMsgKey(keys);
primarykey.setGroupName(groupName);
TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primarykey);
if (mqConsumerLog == null) {
//数据库没有有记录, 消息没有消费过
mqConsumerLog.setMsgId(msgId);
mqConsumerLog.setMsgKey(keys);
mqConsumerLog.setMsgTag(tags);
mqConsumerLog.setMsgBody(body);
mqConsumerLog.setGroupName(groupName);
mqConsumerLog.setConsumerStatus(SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());
mqConsumerLog.setConsumerTimes(1);
mqConsumerLogMapper.insert(mqConsumerLog);
} else {
// 消费失败, 消费次数+1, 可以继续重试
mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes() + 1);
mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);
}
}
}
}
2) 回退优惠券
创建订单失败后, 通过MQ推送下单失败的消息, 优惠券服务[shop-coupon-service]订阅下单失败的主题, 监听到下单失败的消息时, 进行优惠券的回退, 更新优惠券的状态等信息.
package com.crysw.shop.mq;
import com.alibaba.fastjson.JSON;
import com.crysw.constant.ShopCode;
import com.crysw.shop.entity.MQEntity;
import com.crysw.shop.mapper.TradeCouponMapper;
import com.crysw.shop.pojo.TradeCoupon;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@Component
@Slf4j
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", messageModel = MessageModel.BROADCASTING)
public class CancelCouponConsumer implements RocketMQListener {
@Autowired
private TradeCouponMapper couponMapper;
@Override
public void onMessage(MessageExt messageExt) {
try {
// 1. 解析消息内容
String body = new String(messageExt.getBody(), "utf-8");
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
if (mqEntity.getCouponId() != null) {
// 2. 查询优惠券信息
TradeCoupon coupon = couponMapper.selectByPrimaryKey(mqEntity.getCouponId());
// 3. 更改优惠券状态
coupon.setUsedTime(null);
coupon.setIsUsed(ShopCode.SHOP_COUPON_UNUSED.getCode());
coupon.setOrderId(null);
couponMapper.updateByPrimaryKey(coupon);
log.info("回退优惠券成功");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.error("回退优惠券失败");
}
}
}
3) 回退余额
创建订单失败后, 通过MQ推送下单失败的消息, 用户服务[shop-user-service]订阅下单失败的主题, 监听到下单失败的消息时, 进行用户余额的回退.
package com.crysw.shop.mq;
import com.alibaba.fastjson.JSON;
import com.crysw.api.IUserService;
import com.crysw.constant.ShopCode;
import com.crysw.shop.entity.MQEntity;
import com.crysw.shop.pojo.TradeUserMoneyLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", messageModel = MessageModel.BROADCASTING)
public class CancelUserMoneyConsumer implements RocketMQListener {
@Autowired
private IUserService userService;
@Override
public void onMessage(MessageExt messageExt) {
// 1. 解析消息
try {
String body = new String(messageExt.getBody(), "utf-8");
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
log.info("接收到消息");
if (mqEntity.getUserMoney() != null && mqEntity.getUserMoney().compareTo(BigDecimal.ZERO) > 0) {
// 2. 调用业务层, 进行余额修改
TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();
userMoneyLog.setUseMoney(mqEntity.getUserMoney());
userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_REFUND.getCode());
userMoneyLog.setUserId(mqEntity.getUserId());
userMoneyLog.setOrderId(mqEntity.getOrderId());
userService.updateMoneyPaid(userMoneyLog);
log.info("余额回退成功");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.error("余额回退失败");
}
}
}
userService.updateMoneyPaid→
4) 取消订单创建订单失败后, 通过MQ推送下单失败的消息, 订单服务[shop-order-service]订阅下单失败的主题, 监听到下单失败的消息时, 取消订单.
package com.crysw.shop.mq;
import com.alibaba.fastjson.JSON;
import com.crysw.constant.ShopCode;
import com.crysw.shop.entity.MQEntity;
import com.crysw.shop.mapper.TradeOrderMapper;
import com.crysw.shop.pojo.TradeOrder;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}", messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener {
@Autowired
private TradeOrderMapper orderMapper;
@Override
public void onMessage(MessageExt messageExt) {
try {
// 1. 解析消息内容
String body = new String(messageExt.getBody(), "utf-8");
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
log.info("接受消息成功");
// 2. 查询订单
if (mqEntity.getOrderId() != null) {
TradeOrder order = orderMapper.selectByPrimaryKey(mqEntity.getOrderId());
//3.更新订单状态为取消
order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
orderMapper.updateByPrimaryKey(order);
log.info("订单状态设置为取消");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.error("取消订单失败");
}
}
}
4.3 测试
4.3.1 准备测试环境
编写测试类.
package com.crysw.test;
import com.crysw.api.IOrderService;
import com.crysw.shop.OrderApplication;
import com.crysw.shop.pojo.TradeOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.math.BigDecimal;
@SpringBootTest(classes = {OrderApplication.class})
@RunWith(SpringRunner.class)
public class OrderServiceTest {
@Autowired
private IOrderService orderService;
@Test
public void confirmOrder() throws IOException {
Long coupouId = 345988230098857984L;
Long goodsId = 345959443973935104L;
Long userId = 345963634385633280L;
// 创建订单
TradeOrder order = new TradeOrder();
order.setGoodsId(goodsId);
order.setUserId(userId);
order.setCouponId(coupouId);
order.setAddress("北京");
order.setGoodsNumber(1);
// 商品价格
order.setGoodsPrice(new BigDecimal(1000));
// 运费
order.setShippingFee(BigDecimal.ZERO);
// 订单金额 = 商品价格*数量 + 运费
order.setOrderAmount(new BigDecimal(1000));
// 已支付金额
order.setMoneyPaid(new BigDecimal(100));
orderService.confirmOrder(order);
// 订单创建出现异常后会中断程序,导致无法正常取消订单; 这里等待录入可以让当前订单服务不中断,继续启用状态
System.in.read();
}
}
4.3.2 准备测试数据
先还原优惠券状态, 用户余额, 商品库存等数据. 然后启动用户服务, 商品服务, 优惠券服务, 执行上面的单元测试案例进行下单操作(下单操作已模拟异常), 执行完毕后查看用户的余额、优惠券数据是否发生更改,订单的状态是否为取消。
4.3.3 查看测试结果trade_order: 订单表新增一条订单, 因为下单失败, 订单状态已更新为"取消".
|
750137168931983360 345963634385633280 2 (NULL) (NULL) 北京(NULL) 345959443973935104 1 1000.00 (NULL) 0.00 1000.00 345988230098857984 20.00 100.00 880.00 2022-07-28 20:58:24 (NULL) (NULL)
trade_mq_consumer_log: 下单失败, 向mq推送了一条下单失败的消息, 消费者消费消息后进行数据回退.
|
C0A8000472F818B4AAC28F8BD1D70000 order_orderTopic_cancel_group order_cancel 750137168931983360 {"couponId":345988230098857984,"goodsId":345959443973935104,"goodsNum":1,"orderId":750137168931983360,"userId":345963634385633280,"userMoney":100} 1 0 (NULL) (NULL)
trade_coupon: 优惠券状态还是未使用状态.
|
345988230098857984 20.00 345963634385633280 (NULL) 0 (NULL)
trade_user: 用户余额也没有回退成功, 没有减少.
|
345963634385633280 刘备 123L 18888888888L 100 2019-07-09 13:37:03 1000
trade_user_money_log: 用户余额日志表新增一条退款记录.
|
345963634385633280 750137168931983360 1 100.00 2022-07-28 20:58:25 |
345963634385633280 750137168931983360 2 100.00 2022-07-28 20:58:27
trade_goods: 商品库存已恢复, 没有减少.
5. 支付业务 5.1 创建支付订单|
345959443973935104 JavaSE课程 1000 1000.00 传智播客出品Java视频课程 2019-07-09 20:38:00 |
345959443973935105 华为P30 1000 5000.00 夜间拍照更美 2019-07-09 20:38:00
支付业务流程图
代码实现
支付服务接口新增创建支付信息方法 com.crysw.api.IPayService#createPayment
package com.crysw.api;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradePay;
public interface IPayService {
Result createPayment(TradePay tradePay);
}
创建支付信息的实现
@Override
public Result createPayment(TradePay tradePay) {
if (tradePay == null || tradePay.getOrderId() == null) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
// 查询订单支付状态
TradePayExample payExample = new TradePayExample();
TradePayExample.Criteria criteria = payExample.createCriteria();
criteria.andOrderIdEqualTo(tradePay.getOrderId())
.andIsPaidEqualTo(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int count = tradePayMapper.countByExample(payExample);
if (count > 0) {
CastException.cast(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY);
}
// 设置订单的未支付信息
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
tradePay.setPayId(idWorker.nextId());
// 保存支付订单
tradePayMapper.insert(tradePay);
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}
5.2 支付回调
5.2.1 流程分析
5.2.2 代码实现
支付服务接口新增支付回调方法 com.crysw.api.IPayService#callbackPayment
package com.crysw.api;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradePay;
public interface IPayService {
// ..... 省略部分代码
Result callbackPayment(TradePay tradePay);
}
支付回调的实现 com.crysw.shop.service.impl.PayServiceImpl#callbackPayment
@Override
public Result callbackPayment(TradePay tradePay) {
log.info(">>>支付回调");
if (tradePay == null || tradePay.getOrderId() == null) {
CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
}
//1. 判断用户支付状态
if (tradePay.getIsPaid().intValue() == ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode().intValue()) {
//2. 更新支付订单状态为已支付
Long payId = tradePay.getPayId();
TradePay pay = tradePayMapper.selectByPrimaryKey(payId);
//判断支付订单是否存在
if (pay == null) {
CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
}
pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
int r = tradePayMapper.updateByPrimaryKeySelective(pay);
if (r == 1) {
//3. 创建支付成功的消息
TradeMqProducerTemp tradeMqProducerTemp = new TradeMqProducerTemp();
tradeMqProducerTemp.setId(String.valueOf(idWorker.nextId()));
tradeMqProducerTemp.setGroupName(groupName);
tradeMqProducerTemp.setMsgTopic(topic);
tradeMqProducerTemp.setMsgTag(tag);
tradeMqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
tradeMqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
tradeMqProducerTemp.setCreateTime(new Date());
//4. 将消息持久化数据库
mqProducerTempMapper.insert(tradeMqProducerTemp);
log.info(">>>将支付成功消息持久化到数据库");
//在线程池中进行处理(异步处理)
threadPoolTaskExecutor.submit(() -> {
//5. 发送消息到MQ
SendResult result = null;
try {
result = sendMessage(topic, tag, String.valueOf(tradePay.getPayId()), JSON.toJSONString(tradePay));
} catch (Exception e) {
e.printStackTrace();
}
if (result.getSendStatus().equals(SendStatus.SEND_OK)) {
log.info(">>>消息发送成功");
//6. 等待发送结果,如果MQ接受到消息,删除发送成功的消息
mqProducerTempMapper.deleteByPrimaryKey(tradeMqProducerTemp.getId());
log.info(">>>持久化到数据库的消息删除");
}
});
}
return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
} else {
// 状态为未支付
CastException.cast(ShopCode.SHOP_PAYMENT_PAY_ERROR);
return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());
}
}
private SendResult sendMessage(String topic, String tag, String key, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
if (StringUtils.isEmpty(topic)) {
CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
}
if (StringUtils.isEmpty(body)) {
CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
}
Message message = new Message(topic, tag, key, body.getBytes());
return rocketMQTemplate.getProducer().send(message);
}
上面推送MQ消息, 以及等待响应后删除支付成功的数据信息是比较耗时的操作, 如果在高并发场景下影响性能,所以使用了线程池进行异步处理。需要在主启动类或配置类上增加线程池的实例对象。
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("Pool-A");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
5.2.3 处理消息
支付成功后,支付服务payService发送MQ消息,订单服务、用户服务、日志服务需要订阅消息进行后续处理。比如:
订单服务修改订单状态为已支付
日志服务记录支付日志
用户服务负责给用户增加积分
以下用订单服务为例说明消息的处理情况:
1)配置RocketMQ属性值
mq.pay.topic=payTopic mq.pay.consumer.group.name=pay_payTopic_group
2)消费消息
在订单服务中新增支付成功的消息监听, 收到MQ发送的支付成功消息后进行订单支付状态的修改操作.
package com.crysw.shop.mq;
import com.alibaba.fastjson.JSON;
import com.crysw.constant.ShopCode;
import com.crysw.shop.entity.MQEntity;
import com.crysw.shop.mapper.TradeOrderMapper;
import com.crysw.shop.pojo.TradeOrder;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}", consumerGroup = "${mq.pay.consumer.group.name}", messageModel = MessageModel.BROADCASTING)
public class PaymentListener implements RocketMQListener {
@Autowired
private TradeOrderMapper orderMapper;
@Override
public void onMessage(MessageExt messageExt) {
log.info(">>>接收到支付成功消息");
try {
//1. 解析消息内容
String body = new String(messageExt.getBody(), "utf-8");
MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);
//2. 查询订单
TradeOrder order = orderMapper.selectByPrimaryKey(mqEntity.getOrderId());
//3.更新订单状态为已支付
order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
orderMapper.updateByPrimaryKey(order);
log.info(">>>订单状态设置为已支付");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.info(">>>订单状态修改失败");
}
}
}
5.3 单元测试
编写支付服务接口的单元测试
package com.crysw.test;
import com.crysw.api.IPayService;
import com.crysw.constant.ShopCode;
import com.crysw.shop.PayApplication;
import com.crysw.shop.pojo.TradePay;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.math.BigDecimal;
@SpringBootTest(classes = {PayApplication.class})
@RunWith(SpringRunner.class)
public class PayServiceTest {
@Autowired
private IPayService payService;
// 创建订单的待支付信息
@Test
public void createPayment() {
long orderId = 750137168931983360L;
TradePay tradePay = new TradePay();
tradePay.setOrderId(orderId);
tradePay.setPayAmount(new BigDecimal(880));
payService.createPayment(tradePay);
}
@Test
public void callbackPayment() throws IOException {
long payId = 4;
long orderId = 2;
TradePay tradePay = new TradePay();
tradePay.setPayId(payId);
tradePay.setOrderId(orderId);
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
payService.callbackPayment(tradePay);
System.in.read();
}
}
执行com.crysw.test.PayServiceTest#createPayment, 创建订单的待支付信息. 执行完成后支付表trade_pay会新增一条待支付的记录.
|
755868116554227712 750137168931983360 880.00 0
然后执行单元测试模拟支付成功的回调, 向MQ发送支付成功的消息.
2022-08-13 17:50:34.271 INFO 15136 --- [main] c.c.shop.service.impl.PayServiceImpl : >>>支付回调 2022-08-13 17:50:34.301 INFO 15136 --- [main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting... 2022-08-13 17:50:34.306 WARN 15136 --- [ main] com.zaxxer.hikari.util.DriverDataSource : Registered driver with driverClassName=com.mysql.jdbc.Driver was not found, trying direct instantiation. 2022-08-13 17:50:34.543 INFO 15136 --- [main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed. 2022-08-13 17:50:34.688 INFO 15136 --- [main] c.c.shop.service.impl.PayServiceImpl : >>>将支付成功消息持久化到数据库 2022-08-13 17:52:50.145 INFO 15136 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[192.168.65.129:10911] result: true
数据库trade_mq_producer_temp消息生产临时表新增一条支付成功的消息记录. 从rocektMQ-console控制台也可以查询到发送的支付成功的消息.
|
755888109555687424 payProducerGroup payTopic paid 755868116554227712 {"isPaid":2,"orderId":750137168931983360,"payId":755868116554227712} (NULL) 2022-08-13 17:50:35
启动优惠券服务, 商品服务, 用户服务, 订单服务 . 订单服务中的消费者监听到支付成功的消息后, 会进行更新订单支付状态的操作.
从debug断点可以看到消费消息的过程.
消费日志
2022-08-13 18:12:33.637 INFO 6280 --- [MessageThread_1] com.crysw.shop.mq.PaymentListener : >>>接收到支付成功消息 2022-08-13 18:14:36.533 WARN 6280 --- [l-1 housekeeper] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=2m6s521ms847µs400ns). 2022-08-13 18:14:36.547 INFO 6280 --- [MessageThread_1] com.crysw.shop.mq.PaymentListener : >>>订单状态设置为已支付
订单表trade_order的订单支付状态已更新
6. 整体联调|
750137168931983360 345963634385633280 1 2 北京345959443973935104 1 1000.00 0.00 1000.00 345988230098857984 20.00 100.00 880.00 2022-07-28 20:58:24
通过Rest客户端请求shop-order-web和shop-pay-web完成下单和支付操作. 首先创建shop-order-web和shop-pay-web模块.
6.1 准备工作 6.1.1 编写配置编写配置类, 提供RestTemplate实例, 用来模拟web端的http请求调用.
package com.crysw.shop.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.client.RestOperations;
import org.springframework.web.client.RestTemplate;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
@Configuration
public class RestTemplateConfig {
@Bean
@ConditionalOnMissingBean({RestOperations.class, RestTemplate.class})
public RestTemplate restTemplate(ClientHttpRequestFactory factory) {
RestTemplate restTemplate = new RestTemplate(factory);
// 使用 utf-8 编码集的 conver 替换默认的 conver(默认的 string conver 的编码集为"ISO-8859-1")
List> messageConverters = restTemplate.getMessageConverters();
Iterator> iterator = messageConverters.iterator();
while (iterator.hasNext()) {
HttpMessageConverter> converter = iterator.next();
if (converter instanceof StringHttpMessageConverter) {
iterator.remove();
}
}
messageConverters.add(new StringHttpMessageConverter(Charset.forName("UTF-8")));
return restTemplate;
}
@Bean
@ConditionalOnMissingBean({ClientHttpRequestFactory.class})
public ClientHttpRequestFactory simpleClientHttpRequestFactory() {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
// ms
factory.setReadTimeout(15000);
// ms
factory.setConnectTimeout(15000);
return factory;
}
}
shop-order-web配置请求地址
server.host=http://localhost
server.servlet.path=/order-web
server.port=8086
# dubbo
spring.application.name=dubbo-order-consumer
spring.dubbo.application.id=dubbo-order-consumer
spring.dubbo.application.name=dubbo-order-consumer
spring.dubbo.registry.address=zookeeper://192.168.65.129:2181;zookeeper://192.168.65.129:2182;zookeeper://192.168.65.129:2183
## 使用自定义方式获取端口,是因为使用@Value注解获取server.port会获取到系统默认的-1,还没有解析到端口就返回了,导致url不对
order.port=8086
shop.order.baseURI=${server.host}:${order.port}${server.servlet.path}
shop.order.confirm=/order/confirm
shop-pay-web配置请求地址
server.host=http://localhost
server.servlet.path=/pay-web
server.port=8087
# dubbo
spring.application.name=dubbo-pay-consumer
spring.dubbo.application.id=dubbo-pay-consumer
spring.dubbo.application.name=dubbo-pay-consumer
spring.dubbo.registry.address=zookeeper://192.168.65.129:2181;zookeeper://192.168.65.129:2182;zookeeper://192.168.65.129:2183
# url
pay.port=8087
shop.pay.baseURI=${server.host}:${pay.port}${server.servlet.path}
shop.pay.createPayment=/pay/createPayment
shop.pay.callbackPayment=/pay/callBackPayment
6.1.2 编写controller
编写订单web服务
package com.crysw.shop.controller;
import com.alibaba.dubbo.config.annotation.Reference;
import com.crysw.api.IOrderService;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradeOrder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/order")
public class OrderController {
@Reference
private IOrderService orderService;
@RequestMapping("/confirm")
public Result confirmOrder(@RequestBody TradeOrder order) {
return orderService.confirmOrder(order);
}
}
编写支付web服务
package com.crysw.shop.controller;
import com.alibaba.dubbo.config.annotation.Reference;
import com.crysw.api.IPayService;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradePay;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/pay")
public class PayController {
@Reference
private IPayService payService;
@RequestMapping("/createPayment")
public Result createPayment(@RequestBody TradePay pay) {
return payService.createPayment(pay);
}
@RequestMapping("/callBackPayment")
public Result callBackPayment(@RequestBody TradePay pay) throws Exception {
return payService.callbackPayment(pay);
}
}
6.2 下单测试
shop-order-web编写测试代码, 模拟web界面发送http请求进行下单操作.
package com.crysw.test;
import com.crysw.shop.OrderWebApplication;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradeOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;
import java.math.BigDecimal;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {OrderWebApplication.class})
public class OrderWebTest {
@Autowired
private RestTemplate restTemplate;
@Value("${shop.order.baseURI}")
private String baseURI;
@Value("${shop.order.confirm}")
private String confirmOrderPath;
// 模拟界面发送http请求, 访问controller
@Test
public void confirmOrder() {
Long coupouId = 345988230098857984L;
Long goodsId = 345959443973935104L;
Long userId = 345963634385633280L;
// 创建订单
TradeOrder order = new TradeOrder();
order.setGoodsId(goodsId);
order.setUserId(userId);
order.setCouponId(coupouId);
order.setAddress("北京");
order.setGoodsNumber(1);
// 商品价格
order.setGoodsPrice(new BigDecimal(1000));
// 运费
order.setShippingFee(BigDecimal.ZERO);
// 订单金额 = 商品价格*数量 + 运费
order.setOrderAmount(new BigDecimal(1000));
// 已支付金额
order.setMoneyPaid(new BigDecimal(100));
Result result = restTemplate.postForEntity(baseURI + confirmOrderPath, order, Result.class).getBody();
System.out.println(result);
}
}
订单服务的测试日志
2022-08-14 18:24:09.179 INFO 15388 --- [:20884-thread-5] c.c.shop.service.impl.OrderServiceImpl : 校验订单通过 2022-08-14 18:24:09.210 INFO 15388 --- [:20884-thread-5] c.c.shop.service.impl.OrderServiceImpl : 订单: 756258946880245760扣减库存成功 2022-08-14 18:24:09.224 INFO 15388 --- [:20884-thread-5] c.c.shop.service.impl.OrderServiceImpl : 订单:756258946880245760,使用优惠券 2022-08-14 18:24:09.237 INFO 15388 --- [:20884-thread-5] c.c.shop.service.impl.OrderServiceImpl : 订单:756258946880245760,扣减余额成功 2022-08-14 18:24:09.243 INFO 15388 --- [:20884-thread-5] c.c.shop.service.impl.OrderServiceImpl : 订单:[756258946880245760]状态修改(确认)成功
查看库表trade_order也生成了一笔已确认的订单.
|
756258946880245760 345963634385633280 1 0 (NULL) 北京(NULL) 345959443973935104 1 1000.00 (NULL) 0.00 1000.00 345988230098857984 20.00 100.00 880.00 2022-08-14 18:24:09 2022-08-14 18:24:09 (NULL)
查看trade_coupon表, 优惠券已被使用.
|
345988230098857984 20.00 345963634385633280 756258946880245760 1 2022-08-14 18:24:09
查看trade_goods表, 库存扣减1.
|
345959443973935104 JavaSE课程 999 1000.00 传智播客出品Java视频课程 2019-07-09 20:38:00
查看trade_user表, 用户余额扣减100.
|
345963634385633280 刘备 123L 18888888888L 100 2019-07-09 13:37:03 900
查看trade_goods_number_log表, 新增一条商品订单操作记录.
6.3 支付测试|
345959443973935104 756258946880245760 -1 2022-08-14 18:24:09
shop-pay-web编写测试代码, 模拟web界面发送http请求进行下单操作.
package com.crysw.test;
import com.crysw.constant.ShopCode;
import com.crysw.shop.PayWebApplication;
import com.crysw.shop.entity.Result;
import com.crysw.shop.pojo.TradePay;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;
import java.math.BigDecimal;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {PayWebApplication.class})
public class PayWebTest {
@Autowired
private RestTemplate restTemplate;
@Value("${shop.pay.baseURI}")
private String baseURI;
@Value("${shop.pay.createPayment}")
private String createPaymentPath;
@Value("${shop.pay.callbackPayment}")
private String callBackPaymentPath;
@Test
public void createPayment() {
long orderId = 756258946880245760L;
TradePay tradePay = new TradePay();
tradePay.setOrderId(orderId);
tradePay.setPayAmount(new BigDecimal(880));
Result result = restTemplate.postForEntity(baseURI + createPaymentPath, tradePay, Result.class).getBody();
System.out.println(result);
}
@Test
public void callbackPayment() {
long payId = 756324893167067136L;
long orderId = 756258946880245760L;
TradePay tradePay = new TradePay();
tradePay.setPayId(payId);
tradePay.setOrderId(orderId);
tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
Result result = restTemplate.postForEntity(baseURI + callBackPaymentPath, tradePay, Result.class).getBody();
System.out.println(result);
}
}
启动shop-pay-service, shop-pay-web服务, 执行createPayment单元测试, 查看库表trade_pay新增一条订单待支付的记录.
|
756324893167067136 756258946880245760 880.00 0
执行callbackPayment单元测试, 查看库表trade_pay的支付状态已经更新为已付款.
|
756324893167067136 756258946880245760 880.00 2
查看库表trade_order, 订单的支付状态也更新为已支付.
|
756258946880245760 345963634385633280 1 2 北京345959443973935104 1 1000.00 0.00 1000.00 345988230098857984 20.00 100.00 880.00 2022-08-14 18:24:09 2022-08-14 18:24:09