新建订单的接口对数据的一致性要求很高,尤其是涉及到支付、金钱相关的事情。
新建订单中的问题
对于新建订单失败之后,库存应该被归还回去,我们不应该出现以下情况。
- 本地订单新建失败,库存扣减成功
- 本地订单新建成功,库存扣减失败
以下两种情况我们都能接受
- 本地订单新建成功,库存扣减成功
- 本地订单失败,库存扣减失败
由于订单服务是通过网络调用库存服务的,在此过程中,有各种因素的影响。
库存扣减成功
- 本地执行失败,调用库存归还接口,但是此时归还接口出问题了(磁盘满了),网络问题可以通过重试来避免。
- 重试 -网络抖动或者拥塞,之前调用的过一段时间才能被接收到,会不会导致重复归还–幂等性问题
- 本地代码异常,不知道本地执行情况,无法调用库存归还接口。
库存扣减失败
本地事务不执行就行。
先扣库存还是后扣库存
上面我们介绍的是先扣减库存再新建订单,这次我们后扣库存。
我们将扣减库存的加入本地调用中,先执行新建订单、新建订单商品、删除购物车记录、扣减库存。只有扣减库存成功才会commit。
- 扣减库存发送出去了,但是网络拥塞了,就会重试N次,重试结束后还没收到扣减的响应,这时候本地事务认为扣减失败了,就rollback了,但其实扣减成功了。
- 调用扣减库存没有问题,当把网络请求发送出去之后,宕机了。这时候库存还是被扣减了,订单就会被rollback。
TCC解决库存扣减问题
TCC事务管理器中内部事务开始之前都会写日志,下次启动的时候可以读取日志,继续没有执行的逻辑。
基于可靠消息最终一致性方案
订单服务发送一个half消息,开始执行本地事务,如果成功就commit,失败就rollback。库存服务一直在监听是否又库存扣减的消息,进行扣减库存。
本质上解决了可靠消息,消费者应该保证消息一定会被消费。这就要求我们的库存服务一定要可靠,一定要执行成功。这个服务一般可以保证可靠。
但是由于是库存服务,如果没有库存了,扣减失败怎么办?
- 在本地消息执行之前发送归还的half消息
- 调用库存服务,如果失败就不往下执行。如果成功。
- 开始执行本地事务,
- 成功之后rollback归还
- 失败就commit归还
- 发送订单超时的延时消息,库存服务一直监听延迟消息。
- 回查订单本地有没有订单信息,如果有就rollback,没有就commit
实现
生产者
1type OrderListener struct{
2 Code codes.Code
3 Detail string
4 ID int32
5 OrderAmount float32
6}
7
8func NewOrderListener() *OrderListener {
9 return &OrderListener{}
10}
11
12func (dl *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
13 var orderInfo model.OrderInfo
14 err:=json.Unmarshal(msg.Body,&orderInfo)
15 if err!=nil{
16 // 由于还没有执行,所以应该将之前的回宫
17 return primitive.RollbackMessageState
18 }
19 // 没有选中商品
20 if xxxx{
21 dl.Code = code.InvalidArgument
22 dl.Detail = "没有选中的商品"
23 // 没有执行sell之前都要回滚
24 return primitive.RollbackMessageState
25 }
26
27 // 跨服务调用商品微服务
28 if xxx{
29 dl.Code = code.Internal
30 dl.Detail = "批量查询商品信息是啊比"
31 // 没有执行sell之前都要回滚
32 return primitive.RollbackMessageState
33 }
34
35 //跨微服务调用库存微服务,
36 if xxx{
37 // 如果因为网络问题,如何避免误判
38 // sell 返回的状态码 是不是sell中列举出来的状态码就是网络的问题
39 // todo
40 dl.Code = code.Internal
41 dl.Detail = "库存扣减失败"
42 // 没有执行sell之前都要回滚
43 return primitive.RollbackMessageState
44 }
45 // 生成订单表
46 if xxx{
47 dl.Code = code.Internal
48 dl.Detail = "库存扣减失败"
49 // 订单创建失败就要归还库存
50 return primitive.CommitMessageState
51 }
52 // 批量插入订单物品的信息
53 if xxx{
54 dl.Code = code.Internal
55 dl.Detail = "插入订单信息失败"
56 // 就要归还了、
57 // 订单创建失败就要归还库存
58 return primitive.CommitMessageState
59 }
60 // 发送延时消息
61 p, _ := rocketmq.NewProducer(
62 producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
63 producer.WithRetry(2),
64 )
65 err := p.Start()
66 if err != nil {
67 fmt.Printf("start producer error: %s", err.Error())
68 os.Exit(1)
69 }
70 msg := primitive.NewMessage("order_timeout", msg.Body)
71 msg.WithDelayTimeLevel(5) // 设置延迟的级别
72 res, err := p.SendSync(context.Background(), msg)
73
74 if err != nil {
75 fmt.Printf("send message error: %s\n", err)
76 // rollback
77 dl.Code = code.Internal
78 dl.Detail = "发送延时消息失败"
79 return primitive.RollbackMessageState
80 }
81
82 err = p.Shutdown()
83 if err != nil {
84 fmt.Printf("shutdown producer error: %s", err.Error())
85 }
86
87
88 // 本地事务提交 commit
89 dl.Code = code.OK
90 return primitive.RollbackMessageState
91}
92
93func (dl *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
94 fmt.Println("rocketMQ 的消息回查")
95 var orderInfo model.OrderInfo
96 err:=json.Unmarshal(msg.Body,&orderInfo)
97 if err!=nil{
98 // 由于还没有执行,所以应该将之前的回宫
99 return primitive.RollbackMessageState
100 }
101 // 查询订单是否存在
102 if xxx{
103 // 如果订单找不到
104 // 本地事务执行失败了,就要归还库存
105 return primitive.CommitMessageState
106 }
107 time.Sleep(time.Second * 4)
108 return primitive.RollbackMessageState
109}
110....
111
112func (o *OrderServer)CreateOrder(ctx context.context req *proto.OrderRequest)(*proto.OrderInfoResponse ,error){
113 orderListener:=NewOrderListener()
114 // 运行完就能拿到orderListener中的信息
115 p, err := rocketmq.NewTransactionProducer(
116 orderListener,
117 // 先写死
118 producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
119 producer.WithRetry(1),
120 )
121 if err != nil {
122 global.Logger.Errorf("生成producer失败: %s\n", err.Error())
123 return nil,err
124 }
125 err = p.Start()
126 if err != nil {
127 global.Logger.Errorf("启动 producer 失败: %s\n", err.Error())
128 return nil,err
129 }
130 order:=model.OrderInfo{
131 OrderID:GenerateOrderID(req,UserID)
132 Address:...
133 ....
134 }
135
136 jsonString,err:= json.Marshal(order)
137 if err != nil {
138 global.Logger.Errorf("序列化失败: %s\n", err.Error())
139 return nil,err
140 }
141 res, err := p.SendMessageInTransaction(
142 context.Background(),
143 primitive.NewMessage("order_reback", jsonString)
144 )
145
146 if err != nil {
147 global.Logger.Errorf("序列化失败: %s\n", err.Error())
148 return nil,status.Error(codes.Internal,"消息发送失败")
149 }
150 if orderListener.Code!=codes.OK{
151 return nil,status.Error(orderListener.Code,"新建订单失败")
152 }
153 return &proto.OrderInfoResponse{...},ni;
154 // 回查
155 time.Sleep(5 * time.Minute)
156 err = p.Shutdown()
157 if err != nil {
158 fmt.Printf("shutdown producer error: %s", err.Error())
159 }
160}
order_srv/main.go
1func main(){
2 ...
3 // 监听订单超时的topic
4 c, _ := rocketmq.NewPushConsumer(
5 // GroupName 多个实例负载均衡
6 consumer.WithGroupName("testGroup"),
7 consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
8 )
9 err := c.Subscribe("order_timeout", consumer.MessageSelector{},OrderTimeout )
10 if err != nil {
11 fmt.Println(err.Error())
12 }
13 // Note: start after subscribe
14 err = c.Start()
15 if err != nil {
16 fmt.Println(err.Error())
17 os.Exit(-1)
18 }
19 ...
20}
21
22
23func OrderTimeout(ctx context.Context,
24 msgs ...*primitive.MessageExt,
25) (consumer.ConsumeResult, error) {
26 for i := range msgs {
27 // 查询支付状态,如果是未支付,就要归还换库存
28 // 归还库存我们不能直接又调用库存服务,但是我们可以模仿order发送一个消息到order_reback中去
29 // 发送失败 return RetryLater
30 }
31
32 return consumer.ConsumeSuccess, nil
33 }
消费者
在库存服务中,启动本地服务之后 监听归还topic
order_srv/main.go
1
2func main(){
3.....
4// 服务器有数据会推给回调
5 c, _ := rocketmq.NewPushConsumer(
6 // GroupName 多个实例负载均衡
7 consumer.WithGroupName("inventory"),
8 consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
9 )
10 err := c.Subscribe("order_reback", consumer.MessageSelector{}, AutoReback())
11 if err != nil {
12 fmt.Println(err.Error())
13 }
14 // Note: start after subscribe
15 err = c.Start()
16 if err != nil {
17 fmt.Println(err.Error())
18 os.Exit(-1)
19 }
20 time.Sleep(time.Hour)
21 err = c.Shutdown()
22 if err != nil {
23 fmt.Printf("shutdown Consumer error: %s", err.Error())
24 }
25 ....
26}
27
28
29func AutoReback(
30 ctx context.Context,
31 msgs ...*primitive.MessageExt,
32) (consumer.ConsumeResult, error) {
33 for msg:=rang msgs{
34 //既然要归还库存,就应该知道每件商品归还多少,但是有一个问题?重复归还
35 // 这个接口应该保证幂等性,不能因为消息的重复发送倒是一个订单的库存归还多次,没有扣减的库存你别归还
36 // 如何确保这些都没有问题,新建一张表,记录详细的订单扣减的细节,以及归还细节
37 var orderInfo OrderInfo
38 err:=json.Unmarshal(msgs[msg].Body,&orderInfo)
39 if err!=nil{
40 global.Logger.Errof("解析json失败")
41 return consumer.ConsumeSuccess,nil
42 }
43 // 去将inv的库存加回去,将selldetail的状态设为2,在事务中执行
44 if xxx{
45 return consumer.ConsumeSuccess,nil
46 }
47 // 逐个归还
48 ...
49 其中要是有问题
50 return consumer.ConsumeRetryLater,nil
51 //
52 }
53 return consumer.ConsumeSuccess, nil
54 }