在分布式系统,尤其是微服务系统中,一次外部请求往往需要内部多个模块,多个中间件,多台机器的相互调用才能完成。在这一系列的调用中,可能有些是串行的,而有些是并行的。在这种情况下,我们如何才能确定这整个请求调用了哪些应用?哪些模块?哪些节点?以及它们的先后顺序和各部分的性能如何呢?

这就是涉及到链路追踪。

jaeger安装

1docker run -d --name jaeger   -e COLLECTOR_ZIPKIN_HOST_PORT=:9411   -p 5775:5775/udp   -p 6831:6831/udp   -p 6832:6832/udp   -p 5778:5778   -p 16686:16686   -p 14250:14250   -p 14268:14268   -p 14269:14269   -p 9411:9411 jaegertracing/all-in-one:1.32

api层添加链路追踪

链路追踪的起点在每次发起http请求的地方,这时候就需要一个拦截器来生成tracer

shop\api\user-api\middlewares\tracing.go

 1package middlewares
 2
 3import (
 4	"fmt"
 5
 6	"github.com/gin-gonic/gin"
 7	"github.com/uber/jaeger-client-go"
 8	jaegercfg "github.com/uber/jaeger-client-go/config"
 9	"go.uber.org/zap"
10
11	"github.com/jimyag/shop/api/user/global"
12)
13
14func Tracing() gin.HandlerFunc {
15	return func(ctx *gin.Context) {
16		cfg := jaegercfg.Configuration{
17			Sampler: &jaegercfg.SamplerConfig{
18				Type:  jaeger.SamplerTypeConst,
19				Param: 1, // 全部采样
20			},
21			Reporter: &jaegercfg.ReporterConfig{
22				LogSpans: true,
23				LocalAgentHostPort: fmt.Sprintf("%s:%d", 
24					global.ServerConfig.JaegerInfo.Host, // jaeger 位置
25					global.ServerConfig.JaegerInfo.Port, // 6831
26				),
27			},
28			ServiceName: global.ServerConfig.Name,
29		}
30		tracer, close, err := cfg.NewTracer(jaegercfg.Logger(jaeger.StdLogger))
31		if err != nil {
32			global.Logger.Fatal("创建 tracer 失败", zap.Error(err))
33		}
34		defer close.Close()
35		startSpan := tracer.StartSpan(ctx.Request.URL.Path)
36		defer startSpan.Finish()
37		ctx.Set("tracer", tracer)
38		ctx.Set("parentSpan", startSpan)
39		ctx.Next()
40	}
41}

将这个中间件配置到需要链路追踪的router上

shop\api\user-api\initialize\router.go全局都加

1router.Use(middlewares.Tracing())

由于我们使用了负载均衡,所以对于其他的grpc的链接要加一个拦截器,来将context加入到grpc服务中。

 1package initialize
 2
 3import (
 4	"fmt"
 5
 6	"github.com/hashicorp/consul/api"
 7	_ "github.com/mbobakov/grpc-consul-resolver"
 8	"github.com/opentracing/opentracing-go"
 9	"go.uber.org/zap"
10	"google.golang.org/grpc"
11
12	"github.com/jimyag/shop/api/user/global"
13	"github.com/jimyag/shop/api/user/proto"
14	"github.com/jimyag/shop/api/user/util/otgrpc"
15)
16
17func InitSrvConn() {
18	// consul
19	conn, err := grpc.Dial(
20		fmt.Sprintf("consul://%s:%d/%s?wait=14s",
21			global.ServerConfig.ConsulInfo.Host,
22			global.ServerConfig.ConsulInfo.Port,
23			global.ServerConfig.UserSrv.Name,
24		),
25		grpc.WithInsecure(),
26		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
27        // 添加的
28		grpc.WithUnaryInterceptor(
29			otgrpc.OpenTracingClientInterceptor(
30				opentracing.GlobalTracer(),
31			),
32		),
33        // 结束
34	)
35	if err != nil {
36		global.Logger.Fatal("用户服务发现错误", zap.Error(err))
37	}
38	global.UserSrvClient = proto.NewUserClient(conn)
39
40}

shop\api\user-api\util\otgrpc\client.go:31修改源码

 1func OpenTracingClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryClientInterceptor {
 2	otgrpcOpts := newOptions()
 3	otgrpcOpts.apply(optFuncs...)
 4	return func(
 5		ctx context.Context,
 6		method string,
 7		req, resp interface{},
 8		cc *grpc.ClientConn,
 9		invoker grpc.UnaryInvoker,
10		opts ...grpc.CallOption,
11	) error {
12		var err error
13		var parentCtx opentracing.SpanContext
14		// 从 context 提取 父span
15		if parent := opentracing.SpanFromContext(ctx); parent != nil {
16			parentCtx = parent.Context()
17		}
18        // 修改的
19		switch ctx.(type) {
20		case *gin.Context:
21			iTracer, ok := ctx.(*gin.Context).Get("tracer")
22			if ok {
23				tracer = iTracer.(opentracing.Tracer)
24			}
25
26			parentSpan, ok := ctx.(*gin.Context).Get("parentSpan")
27			if ok {
28				parentCtx = parentSpan.(*jaegerClient.Span).Context()
29			}
30
31		}
32
33		if otgrpcOpts.inclusionFunc != nil &&
34			!otgrpcOpts.inclusionFunc(parentCtx, method, req, resp) {
35			return invoker(ctx, method, req, resp, cc, opts...)
36		}
37		clientSpan := tracer.StartSpan(
38			method,
39			opentracing.ChildOf(parentCtx),
40			ext.SpanKindRPCClient,
41			gRPCComponentTag,
42		)
43		defer clientSpan.Finish()
44		// 使用metadata机制传递
45		ctx = injectSpanContext(ctx, tracer, clientSpan)
46		if otgrpcOpts.logPayloads {
47			clientSpan.LogFields(log.Object("gRPC request", req))
48		}
49		err = invoker(ctx, method, req, resp, cc, opts...)
50		if err == nil {
51			if otgrpcOpts.logPayloads {
52				clientSpan.LogFields(log.Object("gRPC response", resp))
53			}
54		} else {
55			SetSpanTags(clientSpan, err, true)
56			clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
57		}
58		if otgrpcOpts.decorator != nil {
59			otgrpcOpts.decorator(clientSpan, method, req, resp, err)
60		}
61		return err
62	}
63}

这里修改源码是拿到context中的tracerparentSpan

grpc集成jaeger

在服务端还有子的过程

client拦截器的原理

从context拿到父亲的span

1// 通过parentSpan生成当前的span
2clientSpan := tracer.StartSpan(
3			method,
4			opentracing.ChildOf(parentCtx),
5			ext.SpanKindRPCClient,
6			gRPCComponentTag,
7		)
8		defer clientSpan.Finish()

通过metadata的机制,将它的内容写到metadata中去

1// 使用metadata机制传递
2		ctx = injectSpanContext(ctx, tracer, clientSpan)

然后通过shop\api\user-api\util\otgrpc\client.go:243

 1func injectSpanContext(ctx context.Context, tracer opentracing.Tracer, clientSpan opentracing.Span) context.Context {
 2	md, ok := metadata.FromOutgoingContext(ctx)
 3	if !ok {
 4		md = metadata.New(nil)
 5	} else {
 6		md = md.Copy()
 7	}
 8	mdWriter := metadataReaderWriter{md}
 9	// 将服务端想要的信息注入到metadata中
10	err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, mdWriter)
11	// We have no better place to record an error than the Span itself :-/
12	if err != nil {
13		clientSpan.LogFields(log.String("event", "Tracer.Inject() failed"), log.Error(err))
14	}
15	return metadata.NewOutgoingContext(ctx, md)
16}

如何写到opentracing中去这是有一个标准,是由opentracing做的,如何提取也是由它来做的。

将服务端想要的信息注入到metadata中去,如果注入、拿数据我们不用关心。

在grpc服务端

1// For example:
2//
3//     s := grpc.NewServer(
4//         ...,  // (existing ServerOptions)
5//         grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)))

只要在new grpcserver的时候添加一个服务端的拦截器就行

shop\service\user_srv\main.go

 1// 初始化jaeger
 2	cfg := jaegercfg.Configuration{
 3		Sampler: &jaegercfg.SamplerConfig{
 4			Type:  jaeger.SamplerTypeConst,
 5			Param: 1, // 全部采样
 6		},
 7		Reporter: &jaegercfg.ReporterConfig{
 8			LogSpans: true,
 9			LocalAgentHostPort: fmt.Sprintf("%s:%d",
10				global.RemoteConfig.JaegerInfo.Host,
11				global.RemoteConfig.JaegerInfo.Port,
12			),
13		},
14		ServiceName: "user-srv",
15	}
16	// 初始化一jaeger
17	tracer, cl, err := cfg.NewTracer(jaegercfg.Logger(jaeger.StdLogger))
18	if err != nil {
19		global.Logger.Fatal("创建 tracer 失败", zap.Error(err))
20	}
21	opentracing.SetGlobalTracer(tracer)
22	// 注册服务
23	server := 		grpc.NewServer(grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)))

我们这边可以自己生成tracer,没有必要用服务端的tracer,我们只要处理好父子关系就好,当整个服务挂了之后cl.Close()

在grpc的服务中如何拿到tracer,

shop\service\user_srv\util\otgrpc\server.go:39从context中拿到span

1spanContext, err := extractSpanContext(ctx, tracer)
1func extractSpanContext(ctx context.Context, tracer opentracing.Tracer) (opentracing.SpanContext, error) {
2	md, ok := metadata.FromIncomingContext(ctx)
3	if !ok {
4		md = metadata.New(nil)
5	}
6    // 与之前的Inject对应
7	return tracer.Extract(opentracing.HTTPHeaders, metadataReaderWriter{md})
8}

在服务中使用:

D:\repository\shop\service\user_srv\handler\user.go

 1func (u *UserServer) GetUserList(ctx context.Context, req *proto.PageIngo) (*proto.UserListResponse, error) {
 2	// 省略之前的
 3    // 从context总拿到parentSpan
 4	parentSpan := opentracing.SpanFromContext(ctx)
 5    // 生成一个span并设置它的父亲
 6	getUserListSpan := opentracing.GlobalTracer().StartSpan("get user list form database", opentracing.ChildOf(parentSpan.Context()))
 7	users, err := u.Store.ListUsers(ctx, arg)
 8	if err != nil {
 9		return nil, status.Errorf(codes.Internal, "获得用户列表信息失败")
10	}
11	getUserListSpan.Finish()
12    // 追踪结束。
13    // 省略其他
14}