新建订单的接口对数据的一致性要求很高,尤其是涉及到支付、金钱相关的事情。

新建订单中的问题

对于新建订单失败之后,库存应该被归还回去,我们不应该出现以下情况。

  1. 本地订单新建失败,库存扣减成功
  2. 本地订单新建成功,库存扣减失败

以下两种情况我们都能接受

  1. 本地订单新建成功,库存扣减成功
  2. 本地订单失败,库存扣减失败

image-20220405094449610

由于订单服务是通过网络调用库存服务的,在此过程中,有各种因素的影响。

库存扣减成功

  1. 本地执行失败,调用库存归还接口,但是此时归还接口出问题了(磁盘满了),网络问题可以通过重试来避免。
    1. 重试 -网络抖动或者拥塞,之前调用的过一段时间才能被接收到,会不会导致重复归还–幂等性问题
  2. 本地代码异常,不知道本地执行情况,无法调用库存归还接口。

库存扣减失败

本地事务不执行就行。

先扣库存还是后扣库存

上面我们介绍的是先扣减库存再新建订单,这次我们后扣库存。

我们将扣减库存的加入本地调用中,先执行新建订单、新建订单商品、删除购物车记录、扣减库存。只有扣减库存成功才会commit。

  1. 扣减库存发送出去了,但是网络拥塞了,就会重试N次,重试结束后还没收到扣减的响应,这时候本地事务认为扣减失败了,就rollback了,但其实扣减成功了。
  2. 调用扣减库存没有问题,当把网络请求发送出去之后,宕机了。这时候库存还是被扣减了,订单就会被rollback。

TCC解决库存扣减问题

image-20220405102438786

TCC事务管理器中内部事务开始之前都会写日志,下次启动的时候可以读取日志,继续没有执行的逻辑。

基于可靠消息最终一致性方案

image-20220405104011632

订单服务发送一个half消息,开始执行本地事务,如果成功就commit,失败就rollback。库存服务一直在监听是否又库存扣减的消息,进行扣减库存。

本质上解决了可靠消息,消费者应该保证消息一定会被消费。这就要求我们的库存服务一定要可靠,一定要执行成功。这个服务一般可以保证可靠。

但是由于是库存服务,如果没有库存了,扣减失败怎么办?

image-20220405111450549

  1. 在本地消息执行之前发送归还的half消息
  2. 调用库存服务,如果失败就不往下执行。如果成功。
  3. 开始执行本地事务,
    1. 成功之后rollback归还
    2. 失败就commit归还
  4. 发送订单超时的延时消息,库存服务一直监听延迟消息。
  5. 回查订单本地有没有订单信息,如果有就rollback,没有就commit

实现

生产者

  1type OrderListener struct{
  2    Code codes.Code
  3    Detail string
  4    ID int32
  5    OrderAmount float32
  6}
  7
  8func NewOrderListener() *OrderListener {
  9   return &OrderListener{}
 10}
 11
 12func (dl *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
 13    var orderInfo model.OrderInfo
 14    err:=json.Unmarshal(msg.Body,&orderInfo)
 15    if err!=nil{
 16        // 由于还没有执行,所以应该将之前的回宫
 17        return primitive.RollbackMessageState
 18    }
 19   // 没有选中商品
 20    if xxxx{
 21        dl.Code = code.InvalidArgument
 22        dl.Detail = "没有选中的商品"
 23        // 没有执行sell之前都要回滚
 24        return primitive.RollbackMessageState
 25    }
 26    
 27    // 跨服务调用商品微服务
 28    if xxx{
 29        dl.Code = code.Internal
 30        dl.Detail = "批量查询商品信息是啊比"
 31        // 没有执行sell之前都要回滚
 32        return primitive.RollbackMessageState
 33    }
 34    
 35    //跨微服务调用库存微服务,
 36    if xxx{
 37        // 如果因为网络问题,如何避免误判
 38        // sell 返回的状态码 是不是sell中列举出来的状态码就是网络的问题
 39        // todo 
 40        dl.Code = code.Internal
 41        dl.Detail = "库存扣减失败"
 42        // 没有执行sell之前都要回滚
 43        return primitive.RollbackMessageState
 44    }
 45    // 生成订单表
 46    if xxx{
 47        dl.Code = code.Internal
 48        dl.Detail = "库存扣减失败"
 49        // 订单创建失败就要归还库存
 50        return primitive.CommitMessageState
 51    }
 52    // 批量插入订单物品的信息
 53    if xxx{
 54        dl.Code = code.Internal
 55        dl.Detail = "插入订单信息失败"
 56        // 就要归还了、
 57        // 订单创建失败就要归还库存
 58        return primitive.CommitMessageState
 59    }
 60    // 发送延时消息
 61    p, _ := rocketmq.NewProducer(
 62		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
 63		producer.WithRetry(2),
 64	)
 65	err := p.Start()
 66	if err != nil {
 67		fmt.Printf("start producer error: %s", err.Error())
 68		os.Exit(1)
 69	}
 70	msg := primitive.NewMessage("order_timeout", msg.Body)
 71	msg.WithDelayTimeLevel(5) // 设置延迟的级别
 72	res, err := p.SendSync(context.Background(), msg)
 73
 74	if err != nil {
 75		fmt.Printf("send message error: %s\n", err)
 76        // rollback
 77         dl.Code = code.Internal
 78        dl.Detail = "发送延时消息失败"
 79        return primitive.RollbackMessageState
 80	}
 81	
 82	err = p.Shutdown()
 83	if err != nil {
 84		fmt.Printf("shutdown producer error: %s", err.Error())
 85	}
 86    
 87    
 88    // 本地事务提交 commit
 89    dl.Code = code.OK
 90   return primitive.RollbackMessageState
 91}
 92
 93func (dl *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
 94	fmt.Println("rocketMQ 的消息回查")
 95    var orderInfo model.OrderInfo
 96    err:=json.Unmarshal(msg.Body,&orderInfo)
 97    if err!=nil{
 98        // 由于还没有执行,所以应该将之前的回宫
 99        return primitive.RollbackMessageState
100    }
101    // 查询订单是否存在
102    if xxx{
103        // 如果订单找不到
104        // 本地事务执行失败了,就要归还库存
105        return primitive.CommitMessageState
106    }
107   time.Sleep(time.Second * 4)
108   return primitive.RollbackMessageState
109}
110....
111
112func (o *OrderServer)CreateOrder(ctx context.context req *proto.OrderRequest)(*proto.OrderInfoResponse ,error){
113    orderListener=NewOrderListener()
114    // 运行完就能拿到orderListener中的信息
115    p, err := rocketmq.NewTransactionProducer(
116		orderListener,
117        // 先写死
118		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
119		producer.WithRetry(1),
120	)
121    if err != nil {
122		global.Logger.Errorf("生成producer失败: %s\n", err.Error())
123        return nil,err
124	}
125	err = p.Start()
126	if err != nil {
127		global.Logger.Errorf("启动 producer 失败: %s\n", err.Error())
128        return nil,err
129	}
130    order:=model.OrderInfo{
131        OrderID:GenerateOrderID(req,UserID)
132        Address:...
133        ....
134    }
135    
136    jsonString,err:= json.Marshal(order)
137    if err != nil {
138		global.Logger.Errorf("序列化失败: %s\n", err.Error())
139        return nil,err
140	}
141	res, err := p.SendMessageInTransaction(
142        context.Background(),
143		primitive.NewMessage("order_reback", jsonString)
144	)
145
146	if err != nil {
147		global.Logger.Errorf("序列化失败: %s\n", err.Error())
148        return nil,status.Error(codes.Internal,"消息发送失败")
149	} 
150    if orderListener.Code!=codes.OK{
151        return nil,status.Error(orderListener.Code,"新建订单失败")
152    }
153    return &proto.OrderInfoResponse{...},ni;
154	// 回查
155	time.Sleep(5 * time.Minute)
156	err = p.Shutdown()
157	if err != nil {
158		fmt.Printf("shutdown producer error: %s", err.Error())
159	}
160}

order_srv/main.go

 1func main(){
 2    ...
 3    // 监听订单超时的topic
 4    c, _ := rocketmq.NewPushConsumer(
 5		// GroupName 多个实例负载均衡
 6		consumer.WithGroupName("testGroup"),
 7		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
 8	)
 9	err := c.Subscribe("order_timeout", consumer.MessageSelector{},OrderTimeout )
10	if err != nil {
11		fmt.Println(err.Error())
12	}
13	// Note: start after subscribe
14	err = c.Start()
15	if err != nil {
16		fmt.Println(err.Error())
17		os.Exit(-1)
18	}
19    ...
20}
21
22
23func OrderTimeout(ctx context.Context,
24	msgs ...*primitive.MessageExt,
25) (consumer.ConsumeResult, error) {
26		for i := range msgs {
27			//  查询支付状态,如果是未支付,就要归还换库存
28            // 归还库存我们不能直接又调用库存服务,但是我们可以模仿order发送一个消息到order_reback中去
29            // 发送失败 return RetryLater
30		}
31
32		return consumer.ConsumeSuccess, nil
33	}

消费者

在库存服务中,启动本地服务之后 监听归还topic

order_srv/main.go

 1
 2func main(){
 3.....
 4// 服务器有数据会推给回调
 5	c, _ := rocketmq.NewPushConsumer(
 6		// GroupName 多个实例负载均衡
 7		consumer.WithGroupName("inventory"),
 8		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
 9	)
10	err := c.Subscribe("order_reback", consumer.MessageSelector{}, AutoReback())
11	if err != nil {
12		fmt.Println(err.Error())
13	}
14	// Note: start after subscribe
15	err = c.Start()
16	if err != nil {
17		fmt.Println(err.Error())
18		os.Exit(-1)
19	}
20	time.Sleep(time.Hour)
21	err = c.Shutdown()
22	if err != nil {
23		fmt.Printf("shutdown Consumer error: %s", err.Error())
24	}
25    ....
26}
27
28
29func AutoReback(
30    ctx context.Context,
31	msgs ...*primitive.MessageExt,
32) (consumer.ConsumeResult, error) {
33    for msg:=rang msgs{
34        //既然要归还库存,就应该知道每件商品归还多少,但是有一个问题?重复归还
35        // 这个接口应该保证幂等性,不能因为消息的重复发送倒是一个订单的库存归还多次,没有扣减的库存你别归还
36        // 如何确保这些都没有问题,新建一张表,记录详细的订单扣减的细节,以及归还细节
37        var orderInfo OrderInfo
38        err:=json.Unmarshal(msgs[msg].Body,&orderInfo)
39        if err!=nil{
40            global.Logger.Errof("解析json失败")
41            return consumer.ConsumeSuccess,nil
42        }
43        // 去将inv的库存加回去,将selldetail的状态设为2,在事务中执行
44        if xxx{
45             return consumer.ConsumeSuccess,nil
46        }
47        // 逐个归还
48        ...
49        其中要是有问题
50        return consumer.ConsumeRetryLater,nil
51        //
52    }
53		return consumer.ConsumeSuccess, nil
54	}