在分布式系统中,数据一致性是非常重要。在此项目中库存的增减也有同样的问题。
快速启动库存服务
复制之前的用户服务
的代码,
生成表结构
1$migrate create -ext sql -dir migration -seq init_schema_inventory
2D:\repository\shop\service\inventory_srv\db\migration\000001_init_schema_inventory.up.sql
3D:\repository\shop\service\inventory_srv\db\migration\000001_init_schema_inventory.down.sql
复制生成的表结构到up文件中
1CREATE TABLE "inventory"
2(
3 "id" bigserial PRIMARY KEY,
4 "created_at" timestamptz NOT NULL DEFAULT (now()),
5 "updated_at" timestamptz NOT NULL DEFAULT (now()),
6 "deleted_at" timestamptz DEFAULT null,
7 "goods" integer NOT NULL,
8 "sticks" integer NOT NULL,
9 "version" integer NOT NULL
10);
11
12CREATE INDEX ON "inventory" ("goods");
down
1DROP TABLE IF EXISTS "inventory";
创建inventory数据库
1docker run --name shop-inventory -p 35433:5432 -e POSTGRES_PASSWORD=postgres -e TZ=PRC -d postgres:14-alpine
23536ecfbb483a64fadaa9e3e253c50b1082dbd71b290f7b36ebd2276c8a74dce
创建数据库
1docker exec -it shop-inventory createdb --username=postgres --owner=postgres shop
删除数据库
1docker exec -it shop-inventory dropdb --username=postgres shop
数据库迁移
1migrate -path db/migration -database "postgresql://postgres:postgres@localhost:35433/shop?sslmode=disable" -verbose up
22022/03/30 22:27:06 Start buffering 1/u init_schema_inventory
32022/03/30 22:27:06 Read and execute 1/u init_schema_inventory
42022/03/30 22:27:06 Finished 1/u init_schema_inventory (read 15.2808ms, ran 22.4171ms)
52022/03/30 22:27:06 Finished after 61.9324ms
62022/03/30 22:27:06 Closing source and database
1migrate -path db/migration -database "postgresql://postgres:postgres@localhost:35433/shop?sslmode=disable" -verbose down
22022/03/30 22:26:59 Are you sure you want to apply all down migrations? [y/N]
3y
42022/03/30 22:27:01 Applying all down migrations
52022/03/30 22:27:01 Start buffering 1/d init_schema_inventory
62022/03/30 22:27:01 Read and execute 1/d init_schema_inventory
72022/03/30 22:27:01 Finished 1/d init_schema_inventory (read 9.3861ms, ran 15.3116ms)
82022/03/30 22:27:01 Finished after 1.9435943s
92022/03/30 22:27:01 Closing source and database
生成curd代码
在wsl中 初始化
1root@Jimyag:/mnt/c/Users/jimyag# docker run --rm -v /mnt/d/repository/shop/service/inventory_srv:/src -w /src kjconroy/sqlc init
写入如下
1version: 1
2packages:
3 - path: "./model" # 生成go 代码的位置
4 name: "model" # 生成 go package 的名字
5 engine: "postgresql" # 使用的数据库引擎
6 schema: "./db/migration/" # 迁移表的sql语句 我们使用migrate中的up文件
7 queries: "./db/query" # CRUD的sql
8 emit_json_tags: true # 添加json在生成的struct中
9 emit_prepared_queries: false
10 emit_interface: true # 生成接口
11 emit_exact_table_names: false # 表名是否带s
在./db/query
中写入
1-- name: CreateInventory :one
2INSERT INTO "inventory"(goods,
3 sticks,
4 version)
5VALUES ($1, $2, $3)
6returning *;
7
8-- name: GetInventoryByGoodsID :one
9SELECT *
10FROM "inventory"
11WHERE goods = $1
12LIMIT 1;
13
14
15-- name: UpdateInventory :one
16update "inventory"
17set updated_at = $1,
18 sticks = sticks + sqlc.arg(counts)
19where goods = $2
20returning *;
生成curd代码
1docker run --rm -v /mnt/d/repository/shop/service/inventory_srv:/src -w /src kjconroy/sqlc generate
protoc生成go代码
1syntax = "proto3";
2option go_package = ".;proto";
3
4service inventory{
5 rpc SetInv(GoodInvInfo) returns(Empty);// 设置库存
6 rpc InvDetail(GoodInvInfo) returns(GoodInvInfo);// 获取库存信息
7 rpc Sell(SellInfo)returns(Empty) ; // 库存扣减
8 rpc Rollback(SellInfo) returns(Empty);// 归还库存
9}
10message SellInfo{
11 repeated GoodInvInfo goodsInfo = 1;
12}
13message GoodInvInfo{
14 int32 goodsId = 1;
15 int32 num = 2;
16}
17message Empty {}
cd到.proto文件
1protoc -I . inventory.proto --go_out=plugins=grpc:.
封装数据库curd
shop\service\inventory_srv\model\store.go
1package model
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "time"
8
9 "go.uber.org/zap"
10 "google.golang.org/grpc/codes"
11 "google.golang.org/grpc/status"
12
13 "github.com/jimyag/shop/service/inventory/global"
14 "github.com/jimyag/shop/service/inventory/proto"
15)
16
17type Store interface {
18 SetInvTx(ctx context.Context, arg CreateInventoryParams) (Inventory, error)
19 SellTx(ctx context.Context, arg *proto.SellInfo) error
20 RollBackTx(ctx context.Context, arg *proto.SellInfo) error
21 Querier
22}
23
24type SqlStore struct {
25 *Queries
26 db *sql.DB
27}
28
29func NewSqlStore(db *sql.DB) Store {
30 return &SqlStore{
31 Queries: New(db),
32 db: db,
33 }
34}
35
36func (store *SqlStore) execTx(ctx context.Context, fn func(queries *Queries) error) error {
37 tx, err := store.db.BeginTx(ctx, nil)
38 if err != nil {
39 return err
40 }
41
42 q := New(tx)
43 err = fn(q)
44 if err != nil {
45 if rbErr := tx.Rollback(); rbErr != nil {
46 return fmt.Errorf("tx err: %v, rb err: %v", err, rbErr)
47 }
48 return err
49 }
50
51 return tx.Commit()
52}
53
54func (store *SqlStore) SetInvTx(ctx context.Context, arg CreateInventoryParams) (Inventory, error) {
55 var inventory Inventory
56 var err error
57 err = store.execTx(ctx, func(queries *Queries) error {
58 inventory, err = queries.GetInventoryByGoodsID(ctx, arg.Goods)
59 if err != nil {
60 if err == sql.ErrNoRows {
61 // 没有找到
62 inventory, err = queries.CreateInventory(ctx, arg)
63 return nil
64 } else {
65 global.Logger.Error("", zap.Error(err))
66 return status.Error(codes.Internal, "内部错误")
67 }
68 }
69 global.Logger.Info("", zap.Any("", inventory))
70 // 找到了
71
72 updateArg := UpdateInventoryParams{
73 UpdatedAt: time.Now(),
74 Goods: inventory.Goods,
75 Counts: arg.Sticks,
76 }
77 inventory, err = queries.UpdateInventory(ctx, updateArg)
78 return err
79 })
80
81 return inventory, err
82}
83
84func (store *SqlStore) SellTx(ctx context.Context, arg *proto.SellInfo) error {
85 // 本地事务 要不都卖,要不都不卖
86 // 拿到所有的商品,
87 // 判断是否有库存
88 // 判断库存是否够
89 // 扣减库存 - 库存 会出现数据不一致的问题
90 err := store.execTx(ctx, func(queries *Queries) error {
91 var inventory Inventory
92 var err error
93 for _, info := range arg.GetGoodsInfo() {
94 inventory, err = queries.GetInventoryByGoodsID(ctx, info.GoodsId)
95 if err != nil {
96 if err == sql.ErrNoRows {
97 return status.Error(codes.NotFound, "没有该货物")
98 } else {
99 return status.Error(codes.Internal, "内部错误")
100 }
101 }
102 if inventory.Sticks < info.Num {
103 return status.Error(codes.InvalidArgument, "货物不够")
104 }
105 updateArg := UpdateInventoryParams{}
106 updateArg.Goods = info.GoodsId
107 updateArg.Counts = -info.Num // 这边应该时负数
108 updateArg.UpdatedAt = time.Now()
109 inventory, err = queries.UpdateInventory(ctx, updateArg)
110 if err != nil {
111 return err
112 }
113 }
114 return nil
115 })
116 return err
117}
118
119func (store *SqlStore) RollBackTx(ctx context.Context, arg *proto.SellInfo) error {
120 err := store.execTx(ctx, func(queries *Queries) error {
121 var err error
122 for _, info := range arg.GetGoodsInfo() {
123 _, err = queries.GetInventoryByGoodsID(ctx, info.GoodsId)
124 if err != nil {
125 if err == sql.ErrNoRows {
126 return status.Error(codes.NotFound, "没有该货物")
127 } else {
128 return status.Error(codes.Internal, "内部错误")
129 }
130 }
131 updateArg := UpdateInventoryParams{}
132 updateArg.Goods = info.GoodsId
133 updateArg.Counts = info.Num // 这边应该时正数
134 updateArg.UpdatedAt = time.Now()
135 _, err = queries.UpdateInventory(ctx, updateArg)
136 if err != nil {
137 return err
138 }
139 }
140 return nil
141 })
142 return err
143}
测试
shop\service\inventory_srv\handler\inventory_test.go
1package handler
2
3import (
4 "context"
5 "testing"
6
7 "github.com/stretchr/testify/require"
8
9 "github.com/jimyag/shop/service/inventory/proto"
10)
11
12func TestSetInv(t *testing.T) {
13 in := proto.GoodInvInfo{
14 GoodsId: 1,
15 Num: 10,
16 }
17 inventory, err := inventoryClient.SetInv(context.Background(), &in)
18 require.NoError(t, err)
19 require.NotNil(t, inventory)
20}
21
22func TestInvDetail(t *testing.T) {
23 in := proto.GoodInvInfo{
24 GoodsId: 1,
25 Num: 10,
26 }
27 inventory, err := inventoryClient.InvDetail(context.Background(), &in)
28 require.NoError(t, err)
29 require.NotNil(t, inventory)
30 require.Equal(t, in.Num, inventory.Num)
31 require.Equal(t, in.GoodsId, inventory.GoodsId)
32}
33
34func TestSell(t *testing.T) {
35 in := proto.GoodInvInfo{
36 GoodsId: 5,
37 Num: 1,
38 }
39 inventory, err := inventoryClient.SetInv(context.Background(), &in)
40 require.NoError(t, err)
41 require.NotNil(t, inventory)
42
43 ins := proto.SellInfo{
44 GoodsInfo: []*proto.GoodInvInfo{
45 {
46 GoodsId: 1,
47 Num: 10,
48 },
49 {
50 GoodsId: 4,
51 Num: 100,
52 },
53 },
54 }
55 inventory, err = inventoryClient.Sell(context.Background(), &ins)
56 require.Error(t, err)
57 require.Nil(t, inventory)
58
59}
60
61func TestRollBack(t *testing.T) {
62 in := proto.SellInfo{
63 GoodsInfo: []*proto.GoodInvInfo{
64 {
65 GoodsId: 1,
66 Num: 10,
67 },
68 },
69 }
70 inventory, err := inventoryClient.Rollback(context.Background(), &in)
71 require.NoError(t, err)
72 require.NotNil(t, inventory)
73
74 in = proto.SellInfo{
75 GoodsInfo: []*proto.GoodInvInfo{
76 {
77 GoodsId: 1,
78 Num: 10,
79 },
80 {
81 GoodsId: 10000,
82 Num: 1,
83 },
84 },
85 }
86 inventory, err = inventoryClient.Rollback(context.Background(), &in)
87 require.Error(t, err)
88 require.Nil(t, inventory)
89}
库存服务的锁问题
对于减少库存也就是出售商品
,多个携程同时对数据库进行修改,虽然我们使用了事务,但是这也不能避免。
1graph TB
2 subgraph g1
3 goroutinue1 --> 查询服务1 --> 判断逻辑1 --> 业务逻辑1 --> 更新数据1
4 end
5
6 subgraph g2
7 goroutinue2 --> 查询服务 --> 判断逻辑 --> 业务逻辑 --> 更新数据
8 end
9
两个goroutine同时对数据库进行curd,在开始我们执行事务,这是g1拿到查询的值为100,g2拿到查询的值为100,g1进行更新数据更新为99,g2更新为99,两个事务提交。这时候发现数据已经不一致了。这时候我们给这个事务加锁,是可以解决这个问题的。但是这样的性能太低了,如果我只修改某一个商品的库存,那么所有的库存都要被加锁,以及我们如果有多个服务,一个系统锁只能管住一个服务实例,有多个实例即一个分布式系统时,我们就需要分布式锁。
基于mysql的悲观锁,乐观锁
悲观锁
悲观锁是一种思想,一种互斥锁,串行化了
mysql 的 for update
@@autocommit
要关闭auto commit
1select * form test where goods=12 for update
2-- 只会锁住满足条件的数据 只有 goods 是索引的话才会这样 如果where的条件没有索引,行锁会升级成表锁
3-- 只锁更新的语句
4-- 如果没有满足条件的结果,不会缩表, where 字段为索引
5-- 如果不满足条件不是索引还是会缩表
乐观锁
用到version
查询数据的时候查询版本号,
1update inv set stocks = stocks-2 version = version +1 where goods =421 and version =version
同时过来只有一个可以成功,但是其余的都要重试
基于redis的分布式锁
实现原理
实现原理 判断某个key是否存在且为1,不存在 设置key为1
- 判断421是否为1
- 如果没有,设置为1
- 如果有,就轮询等待
- 业务逻辑做完,就删除这个key
1和2应该是一起的是一个原子操作,redis实现了一个命令setnx如果key不存在,设置指定的值,上述过程就变为
- setnx(421,1)
- 业务逻辑
- 删除这个key
如果服务挂掉了?业务逻辑挂了,没有完成,就不会删除这个key,其他的都一直在等待这个。
解决方案:
- 设置过期时间(过期之后自动删除)
如果设置了过期时间,比如是8秒
,我的业务需要10秒中才能完成,怎么办?
- 在过期之前刷新一下过期时间
- 但是需要自己启动一个携程刷新
- 延时的接口可能会带来负面影响–。如果某一个服务hung住了,本来是2s就能执行完,但是你hung住(由于各种原因,比如硬件问题)就会一直申请延长锁,导致别人永远获取不到锁,
分布式锁解决的问题 –基于lua脚本去做
- 互斥性 -setnx
- 死锁
- 安全性
- 锁只能被持有该锁的用户删除,不能被其他用户删除,
- 设置的value是多少,只有当是的gor才能直到,
- 在删除的时候取出redis中的值和当前自己保存下来的值对比,如果一样删除
- 锁只能被持有该锁的用户删除,不能被其他用户删除,
多节点redis实现的分布式锁算法(RedLock):有效防止单点故障
在一个系统中,我们有多个redis的实例,有一个是redis的master节点,其余的都是redis的slaver节点当一个服务向redis的某个节点拿到锁之后,reids的集群会自动同步所有的锁的状况,这里的同步我们先不做关心。
在这个时候,master宕机了,他们之间的同步服务用不了了??,这时候应该怎么办?
红锁的原理:
有5台redis的实例,在获取锁(setnx)的时候应该在5台实例上都获得锁,这五台都是相同级别的,
在获得锁的时候,如果两个服务同时获得锁,一个获得了一部分,一个获得了另一部分,如果要求全部设置上的话,就都会失败,那么重试的话,就不会成功。这时应该拿到多数的台数就算成功。5台的话谁先拿到3台就成功,如果有三个服务分别拿了221,那么就重试,直到有人拿到一半以上。这里拿锁是同时去拿,同时开gor去拿锁,如果某个服务没有拿到多数的锁就应该释放当前的锁。
别人总结的
假设有5个完全独立的redis主服务器
- 获取当前时间戳
- client尝试按照顺序使用相同的key,value获取所有redis服务的锁,在获取锁的过程中的获取时间比锁过期时间短 很多,这是为了不要过长时间等待已经关闭的redis服务。且试着获取下一个redis实例。 比如: TTL为5s,设置获取锁最多用1s,所以如果一秒内无法获取锁, 就放弃获取这个锁,从而尝试获取下个锁
- client通过获取所有能获取的锁后的时间减去第一步的时间, 这个时间差要小于TTL时间并且至少有3个redis实例 成功获取锁,才算真正的获取锁成功
- 如果成功获取锁,则锁的真正有效时间是TTL减去第三步的时间差的时间;比如: TTL是5s,获取所有锁用了2s, 则真正锁有效时间为3s(其实应该再减去时钟漂移);
- 如果客户端由于某些原因获取锁失败,便会开始解锁所有redis实例;因为可能已经获取了小于3个锁,必须释 放,否则影响其他client获取锁
什么是时钟漂移
如果redis服务器的机器时钟发生了向前跳跃,就会导致这个key过早超时失效,比如说客户端1拿到锁后,key的过 期时间是12:02分,但redis服务器本身的时钟比客户端快了2分钟,导致key在12:00的时候就失效了,这时候,如 果客户端1还没有释放锁的话,就可能导致多个客户端同时持有同-把锁的问题。
RedLock算法是否是异步算法? ?
可以看成是同步算法;因为即使进程间(多个电脑间)没有同步时钟,但是每个进程时间流速大致相同;并且时 钟漂移相对于TTL叫小,可以忽略,所以可以看成同步算法; (不够严谨, 算法上要算上时钟漂移,因为如果两个 电脑在地球两端,则时钟漂移非常大)
RedLock失败重试
当client不能获取锁时,应该在随机时间后重试获取锁;且最好在同-时刻并发的把set命令发送给所有redis实 例;而且对于已经获取锁的client在完成任务后要及时释放锁,这是为了节省时间;
RedLock释放锁
由于释放锁时会判断这个锁的value是不是自己设置的,如果是才删除;所以在释放锁时非常简单,只要向所有实 例都发出释放锁的命令,不用考虑能否成功释放锁;
RedLock注意点(Safety arguments) :
1.先假设client获取所有实例,所有实例包含相同的key和过期时间(TTL) ,但每个实例set命令时间不同导致不能同时 过期,第一个set命令之前是T1,最后一个set命令后为T2,则此client有效获取锁的最小时间为TTL-(T2-T1)-时钟漂移; 2.对于以N/2+ 1(也就是一半以,上)的方式判断获取锁成功,是因为如果小于一半判断为成功的话,有可能出现多 个client都成功获取锁的情况,从而使锁失效 3.-个client锁定大多数事例耗费的时间大于或接近锁的过期时间,就认为锁无效,并且解锁这个redis实例(不执行 业务) ;只要在TTL时间内成功获取一半以上的锁便是有效锁:否则无效
系统有活性的三个特征
1.能够自动释放锁 2.在获取锁失败(不到一半以上),或任务完成后 能够自动释放锁,不用等到其自动过期 3.在client重试获取哦锁前(第-次失败到第二 次重试时间间隔) 大于第一次获取锁消耗的时间; 4.重试获取锁要有-定次数限制
RedLock性能及崩溃恢复的相关解决方法
1.如果redis没有持久化功能,在clientA获取锁成功后,所有redis重启,clientB能够再次获取到锁,这样违法了锁 的排他互斥性; 2.如果启动AOF永久化存储,事情会好些,举例:当我们重启redis后, 由于redis过期机制是按照unix时间戳走的, 所以在重启后,然后会按照规定的时间过期,不影响业务;但是由于AOF同步到磁盘的方式默认是每秒-次,如果在 一秒内断电, 会导致数据丢失,立即重启会造成锁互斥性失效;但如果同步磁盘仿式使用Always(每一个写命令 都同 步到硬盘)造成性能急剧下降;所以在锁完全有效性和性能方面要有所取舍; 3.有效解决既保证锁完全有效性及性能高效及即使断电情况的方法是redis同步到磁盘方式保持默认的每秒,在 redis无论因为什么原因停掉后要等待TTL时间后再重启(学名:延迟重启) ;缺点是在TTL时间内服务相当于暂停状态;
总结
1.TTL时长要大于正常业务执行的时间+获取所有redis服务消耗时间+时钟漂移 2.获取redis所有服务消耗时间要远小于TTL时间,并且获取成功的锁个数要在总数的一般以上:N/2+1 3.尝试获取每个redis实例锁时的时间要远小于TTL时间 4.尝试获取所有锁失败后重新尝试一定要有一定次数限制 5.在redis崩溃后(无论-个还是所有),要延迟TTL 时间重启redis 6.在实现多redis节点时要结合单节点分布式锁算法共同实现
在项目中使用
1package main
2
3import (
4 "fmt"
5 "sync"
6 "time"
7
8 goredislib "github.com/go-redis/redis/v8"
9 "github.com/go-redsync/redsync/v4"
10 "github.com/go-redsync/redsync/v4/redis/goredis/v8"
11)
12
13func main() {
14 // Create a pool with go-redis (or redigo) which is the pool redisync will
15 // use while communicating with Redis. This can also be any pool that
16 // implements the `redis.Pool` interface.
17 client := goredislib.NewClient(&goredislib.Options{
18 Addr: "localhost:36379",
19 })
20 pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)
21
22 // Create an instance of redisync to be used to obtain a mutual exclusion
23 // lock.
24 rs := redsync.New(pool)
25
26 // Obtain a new mutex by using the same name for all instances wanting the
27 // same lock.
28
29 gNum := 2
30 mutexname := "my-global-mutex"
31 var wg sync.WaitGroup
32 wg.Add(gNum)
33 for i := 0; i < gNum; i++ {
34
35 mutex := rs.NewMutex(mutexname)
36 fmt.Println("开始获取锁")
37 if err := mutex.Lock(); err != nil {
38 panic(err)
39 }
40 fmt.Println("获取锁成功")
41 time.Sleep(time.Second * 5)
42 fmt.Println("开始释放锁")
43 if ok, err := mutex.Unlock(); !ok || err != nil {
44 panic("unlock failed")
45 }
46 fmt.Println("释放锁成功")
47 wg.Done()
48 }
49 wg.Wait()
50}
-
设置全局的
rs = redsync.New(pool)
-
初始化全局的rs
-
在需要用到的地方
1mutex := rs.NewMutex(mutexname) 2fmt.Println("开始获取锁") 3if err := mutex.Lock(); err != nil { 4 panic(err) 5 //错误处理 6} 7// 处理业务代码 8... 9// 处理业务结束 10if ok, err := mutex.Unlock(); !ok || err != nil { 11 panic("unlock failed") 12 // 处理错误 13}