在介绍分布式事务的时候我们介绍过MQ(消息队列),只是简单的提了一下,这篇文章会从mQ的基本概念、RocketMQ的概念,使用RocketMQ等几个方面介绍RocketMQ。

MQ使用场景

什么是MQ

消息队列是一种先进先出的数据结构

应用场景

其应用场景主要包含以下三个方面

应用解耦

系统的耦合性越高,容错性就越低。

以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。

流量削峰

应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。

一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总不能下单体验要好。

出于经济考量目的:业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰。

数据分发

对与A系统,其他系统都要A系统的服务,在不使用MQ时,如果要新增一个系统来拿到A系统的信息,这时候就要改A系统的代码,让他能和E系统通信。如果其中的一个系统不想要A系统了,这时候还要删除与它相关的代码。

image-20220404152357287

而引入MQ之后,A系统将自己的产生的数据发送到MQ中,新系统想要数据就直接从MQ中消费,原来的系统如果不需要这些数据了,就可以不接受。

image-20220404152759967

缺点

缺点包含以下几点:

  • 系统可用性降低
    • 系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
    • 如何保证MQ的高可用?
  • 系统复杂度提高
    • MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
    • 如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
  • 一致性问题
    • A系统处理完业务,通过MQ给B、C、 D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。
    • 如何保证消息数据处理的一致性?

RocketMQ的安装

新建配置文件

./data/brokerconf/broker.conf写入以下内容

 1# 所属集群名字
 2brokerClusterName=DefaultCluster
 3
 4# broker 名字注意此处不同的配置文件填写的不一样如果在 broker-a.properties 使用: broker-a,
 5#  broker-b.properties 使用: broker-b
 6brokerName=broker-a
 7
 8# 0 表示 Master> 0 表示 Slave
 9brokerId=0
10
11# nameServer地址分号分割
12# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
13
14# 启动IP,如果 docker  com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
15# 解决方式1 加上一句 producer.setVipChannelEnabled(false);解决方式2 brokerIP1 设置宿主机IP不要使用docker 内部IP
16brokerIP1=192.168.0.2 #如果没有报错就不用管
17
18# 在发送消息时自动创建服务器不存在的topic默认创建的队列数
19defaultTopicQueueNums=4
20
21# 是否允许 Broker 自动创建 Topic建议线下开启线上关闭 !!!这里仔细看是 falsefalsefalse
22autoCreateTopicEnable=true
23
24# 是否允许 Broker 自动创建订阅组建议线下开启线上关闭
25autoCreateSubscriptionGroup=true
26
27# Broker 对外服务的监听端口
28listenPort=10911
29
30# 删除文件时间点默认凌晨4点
31deleteWhen=04
32
33# 文件保留时间默认48小时
34fileReservedTime=120
35
36# commitLog 每个文件的大小默认1G
37mapedFileSizeCommitLog=1073741824
38
39# ConsumeQueue 每个文件默认存 30W 根据业务情况调整
40mapedFileSizeConsumeQueue=300000
41
42# destroyMapedFileIntervalForcibly=120000
43# redeleteHangedFileInterval=120000
44# 检测物理文件磁盘空间
45diskMaxUsedSpaceRatio=88
46# 存储路径
47# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
48# commitLog 存储路径
49# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
50# 消费队列存储
51# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
52# 消息索引存储路径
53# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
54# checkpoint 文件存储路径
55# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
56# abort 文件存储路径
57# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
58# 限制的消息大小
59maxMessageSize=65536
60
61# flushCommitLogLeastPages=4
62# flushConsumeQueueLeastPages=2
63# flushCommitLogThoroughInterval=10000
64# flushConsumeQueueThoroughInterval=60000
65
66# Broker 的角色
67# - ASYNC_MASTER 异步复制Master
68# - SYNC_MASTER 同步双写Master
69# - SLAVE
70brokerRole=ASYNC_MASTER
71
72# 刷盘方式
73# - ASYNC_FLUSH 异步刷盘
74# - SYNC_FLUSH 同步刷盘
75flushDiskType=ASYNC_FLUSH
76
77# 发消息线程池数量
78# sendMessageThreadPoolNums=128
79# 拉消息线程池数量
80# pullMessageThreadPoolNums=128

brokerIP1是本机ip

新建docker-compost.yml

 1version: '3.5'
 2services:
 3  rmqnamesrv:
 4    image: foxiswho/rocketmq:server
 5    container_name: rmqnamesrv
 6    ports:
 7      - 9876:9876
 8    volumes:
 9      - ./data/logs:/opt/logs
10      - ./data/store:/opt/store
11    networks:
12        rmq:
13          aliases:
14            - rmqnamesrv
15
16  rmqbroker:
17    image: foxiswho/rocketmq:broker
18    container_name: rmqbroker
19    ports:
20      - 10909:10909
21      - 10911:10911
22    volumes:
23      - ./data/logs:/opt/logs
24      - ./data/store:/opt/store
25      - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
26    environment:
27        NAMESRV_ADDR: "rmqnamesrv:9876"
28        JAVA_OPTS: " -Duser.home=/opt"
29        JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
30    command: mqbroker -c /etc/rocketmq/broker.conf
31    depends_on:
32      - rmqnamesrv
33    networks:
34      rmq:
35        aliases:
36          - rmqbroker
37
38  rmqconsole:
39    image: styletang/rocketmq-console-ng
40    container_name: rmqconsole
41    ports:
42      - 8080:8080
43    environment:
44        JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
45    depends_on:
46      - rmqnamesrv
47    networks:
48      rmq:
49        aliases:
50          - rmqconsole
51
52networks:
53  rmq:
54    name: rmq
55    driver: bridge

启动rocketmq

1docker-compose up

测试

在rocket-consle中发送消息。

浏览器中输入http://localhost:8080

选择Topic->ADD/UPDATE->选择默认的Cluster和BrokerName,输入自己的topic name–test->commit。右上角出现Success就是topic已经建立成功。

找到自己新建的topic名称,->SEND MAGGAGE->设置tag、key和内容->commit

在Message中通过topic筛选消息,可以看到已经发布的消息了。

RocketMQ的基本概念

image-20220404170309043

Producer:消息的发送者,消息的生产者。举例发信人 Consumer: 消息接收者,消息的消费者。举例:收信者 Broker: 暂存和传输消息。如果要拿具体的信息首先要从nameserver中拿具体的broker信息,再到具体的broker中获取信息;举例:邮局 NameServer: 管理Broker,类似于注册中心,同步存数据的各个结点,但是这个集群不用做数据同步。原因:??; 举例:各个邮局的管理机构 Topic: 区分消息的种类;一个发送者可以发送消息给一 个或者多个Topic;一-个消息的接收者可以订阅一个或者多个Topic消息。类似你可以把你写的文章放到不同的平台,看文章的人可以从多个平台看文章 Message Queue:相当于是Topic的分区;用于并行发送和接收消息

RocketMQ的消息类型

按照发送的特点分

同步发送

a.同步发送,线程阻塞,投递completes阻塞结束I b.如果发送失败,会在默认的超时时间3秒内进行重试,最多重试2次 c.投递completes不代表投递成功,要check SendResult.sendStatus来判断是否投递成功 d. SendResult里面有发送状态的枚举: SendStatus, 同步的消息投递有一个状态返回值的

1public enum SendStatus {
2	SEND_ OK; // 成功
3	FLUSH_ DISK_ TIMEOUT;
4    FLUSH_ SLAVE TIMEOUT;
5	SLAVE_ NOT_ AVAILABLE;
6}

e. retry的实现原理:只有ack的SendStatus=SEND_ OK才会停止retry 注意事项:发送同步消息且Ack为SEND. OK,只代表该消息成功的写入了MQ当中,并不代表该消息成功的被Consumer消费了。

异步发送

a.异步调用的话,当前线程一定要等待异步线程回调结束再关闭producer啊,因为是异步的,不会阻塞提前关闭producer会导致未回调链接就断开了。 b.异步消息不retry,投递失败回调onException()方法,只有同步消息才会retry。 C.异步发送一般用于链路耗时较长,对RT响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

单向发送

a.消息不可靠,性能高,只负责往服务器发送一条消息, 不会重试也不关心是否发送成功 b.此方式发送消息的过程耗时非常短, 一般在微秒级别

下表概括了三者的特点和主要区别

发送方式 发送TPS(性能测试指标) 发送结果反馈 可靠性
同步发送 不丢失
异步发送 不丢失
单向发送 最快 没有 可能会丢失

按照使用功能特点分

普通消息(订阅)

普通消息是我们在业务开发中用到的最多的消息类型,生产者需要关注消息发送成功即可,消费者消费到消息即可,不需要保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量很高,适合大部分场景。不能够保证顺序。

image-20220404192826869

顺序消息

顺序消息分为分区顺序消息和全局顺序消息。

全局顺序消息比较容易理解,也就是哪条消息先进入,哪条消息就会先被消费,符合我们的FIFO。

很多时候全局消息的实现代价很大,所以就出现了分区顺序消息。

我们通过对消息的key,进行hash,相同hash的消息会被分配到同一个分区里面,当然如果要做全局顺序消息,我们的分区只需要-一个即可,所以全局顺序消息的代价是比较大的。

延时消息

主要用来订单超时库存归还。

延迟的机制是在服务端实现的,也就是Broker收到了消息,但是经过一段时间以后才发送服务器按照1-N定义了如下级别:“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”。

若要发送定时消息,在应用层初始化Message消息对象之后,调用Message.setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s。

1msg.setDelayLevel(2);
2SendResult sendResult = producer.send(msg)

实现原理: a.发送消息的时候如果消息设置了DelayTimeLevel,那么该消息会被丢到ScheduleMessageService.SCHEDULE_ TOPIC这个Topic里面 b.根据DelayTimeLevel选择对应的queue C.再把真实的topic和queue信息封装起来, set到msg里面 d.然后每个SCHEDULE TOPIC_ XXX的每个DelayTimeLevelQueue,有定时任务去刷新,是否有待投递的消息 e.每10s定时持久化发送进度

事务消息

消息队列RocketMQ版提供的分布式事务消息适用于所有对数据最终一致性有 强需求的场景。

概念介绍

事务消息:消息队列RocketMQ版提供类似X或Open XA的分布式事务功能,通过消息队列RocketMQ版事务消息能达到分布式事务的最终一致。

半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列RocketMQ版服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列RocketMQ版服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback),该询问过程即消息回查。

分布式事务消息的优势

消息队列RocketMQ版分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。 同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。

Go操作RocketMQ

RocketMQ发送普通消息

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"os"
 7	"strconv"
 8
 9	"github.com/apache/rocketmq-client-go/v2"
10	"github.com/apache/rocketmq-client-go/v2/primitive"
11	"github.com/apache/rocketmq-client-go/v2/producer"
12)
13
14// Package main implements a simple producer to send message.
15func main() {
16	p, err := rocketmq.NewProducer(
17		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
18		producer.WithRetry(2),
19	)
20
21	if err != nil {
22		fmt.Printf("new producer error: %s", err.Error())
23		os.Exit(1)
24	}
25	err = p.Start()
26	if err != nil {
27		fmt.Printf("start producer error: %s", err.Error())
28		os.Exit(1)
29	}
30	topic := "hellomq"
31
32	for i := 0; i < 10; i++ {
33		msg := &primitive.Message{
34			Topic: topic,
35			Body:  []byte("Hello RocketMQ Go Client! " + strconv.Itoa(i)),
36		}
37		res, err := p.SendSync(context.Background(), msg)
38
39		if err != nil {
40			fmt.Printf("send message error: %s\n", err)
41		} else {
42			fmt.Printf("send message success: result=%s\n", res.String())
43		}
44	}
45	err = p.Shutdown()
46	if err != nil {
47		fmt.Printf("shutdown producer error: %s", err.Error())
48	}
49}

RocketMQ消费消息

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"os"
 7	"time"
 8
 9	"github.com/apache/rocketmq-client-go/v2"
10	"github.com/apache/rocketmq-client-go/v2/consumer"
11	"github.com/apache/rocketmq-client-go/v2/primitive"
12)
13
14func main() {
15	// 服务器有数据会推给回调
16	c, _ := rocketmq.NewPushConsumer(
17		// GroupName 多个实例负载均衡
18		consumer.WithGroupName("testGroup"),
19		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
20	)
21	err := c.Subscribe("hellomq", consumer.MessageSelector{}, func(ctx context.Context,
22		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
23		for i := range msgs {
24			fmt.Printf("subscribe callback: %v \n", msgs[i])
25		}
26
27		return consumer.ConsumeSuccess, nil
28	})
29	if err != nil {
30		fmt.Println(err.Error())
31	}
32	// Note: start after subscribe
33	err = c.Start()
34	if err != nil {
35		fmt.Println(err.Error())
36		os.Exit(-1)
37	}
38	time.Sleep(time.Hour)
39	err = c.Shutdown()
40	if err != nil {
41		fmt.Printf("shutdown Consumer error: %s", err.Error())
42	}
43}

RocketMQ发送延迟消息

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"os"
 7
 8	"github.com/apache/rocketmq-client-go/v2"
 9	"github.com/apache/rocketmq-client-go/v2/primitive"
10	"github.com/apache/rocketmq-client-go/v2/producer"
11)
12
13func main() {
14	p, _ := rocketmq.NewProducer(
15		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
16		producer.WithRetry(2),
17	)
18	err := p.Start()
19	if err != nil {
20		fmt.Printf("start producer error: %s", err.Error())
21		os.Exit(1)
22	}
23	for i := 0; i < 10; i++ {
24		msg := primitive.NewMessage("hellomq", []byte("Hello RocketMQ Go Client!"))
25		msg.WithDelayTimeLevel(2) // 设置延迟的级别
26		res, err := p.SendSync(context.Background(), msg)
27
28		if err != nil {
29			fmt.Printf("send message error: %s\n", err)
30		} else {
31			fmt.Printf("send message success: result=%s\n", res.String())
32		}
33	}
34	err = p.Shutdown()
35	if err != nil {
36		fmt.Printf("shutdown producer error: %s", err.Error())
37	}
38
39}

我们在下单之后,超时就要取消库存 定时执行逻辑 轮询的问题是多久执行一次,这里我们假设超时的时间是半个小时。 程序在12:00执行了一次,我在12.01下单了,12:30又执行,下次执行就是13:00,本来12:31就超时了,该回收了,但却等到了13:00才超时,在这29分钟内这是库存就不能被别人买。 如果一分钟轮询一次,无用功太多了。 我们使用延迟消息时间一到就执行,消息中包含了订单编号,只用查询这种编号就行了,不用轮询也减少数据库的压力。

RocketMQ事务消息

 1package main
 2
 3import (
 4   "context"
 5   "fmt"
 6   "os"
 7   "strconv"
 8   "time"
 9
10   "github.com/apache/rocketmq-client-go/v2"
11   "github.com/apache/rocketmq-client-go/v2/primitive"
12   "github.com/apache/rocketmq-client-go/v2/producer"
13)
14
15type DemoListener struct {
16}
17
18func NewDemoListener() *DemoListener {
19   return &DemoListener{}
20}
21
22func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
23   fmt.Println("开始执行本地逻辑")
24   time.Sleep(time.Second * 3)
25   fmt.Println("执行本地逻辑成功")
26   //return primitive.CommitMessageState
27   //return primitive.RollbackMessageState
28   // 本地执行逻辑无缘无故失败 代码异常、宕机
29   return primitive.UnknowState
30}
31
32func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
33   fmt.Println("rocketMQ 的消息回查")
34   time.Sleep(time.Second * 4)
35   return primitive.CommitMessageState
36}
37
38func main() {
39   p, _ := rocketmq.NewTransactionProducer(
40      NewDemoListener(),
41      producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.0.2:9876"})),
42      producer.WithRetry(1),
43   )
44   err := p.Start()
45   if err != nil {
46      fmt.Printf("start producer error: %s\n", err.Error())
47      os.Exit(1)
48   }
49
50   for i := 0; i < 1; i++ {
51      res, err := p.SendMessageInTransaction(context.Background(),
52         primitive.NewMessage("transaction", []byte("Hello RocketMQ again "+strconv.Itoa(i))))
53
54      if err != nil {
55         fmt.Printf("send message error: %s\n", err)
56      } else {
57         fmt.Printf("send message success: result=%s\n", res.String())
58      }
59   }
60   // 回查
61   time.Sleep(5 * time.Minute)
62   err = p.Shutdown()
63   if err != nil {
64      fmt.Printf("shutdown producer error: %s", err.Error())
65   }
66}

参考

docker-compose安装RocketMQ

rocketmq-client-go/examples at master · apache/rocketmq-client-go (github.com)