在分布式系统,尤其是微服务系统中,一次外部请求往往需要内部多个模块,多个中间件,多台机器的相互调用才能完成。在这一系列的调用中,可能有些是串行的,而有些是并行的。在这种情况下,我们如何才能确定这整个请求调用了哪些应用?哪些模块?哪些节点?以及它们的先后顺序和各部分的性能如何呢?
这就是涉及到链路追踪。
jaeger安装
1docker run -d --name jaeger -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 -p 5775:5775/udp -p 6831:6831/udp -p 6832:6832/udp -p 5778:5778 -p 16686:16686 -p 14250:14250 -p 14268:14268 -p 14269:14269 -p 9411:9411 jaegertracing/all-in-one:1.32
api层添加链路追踪
链路追踪的起点在每次发起http请求的地方,这时候就需要一个拦截器来生成tracer
shop\api\user-api\middlewares\tracing.go
1package middlewares
2
3import (
4 "fmt"
5
6 "github.com/gin-gonic/gin"
7 "github.com/uber/jaeger-client-go"
8 jaegercfg "github.com/uber/jaeger-client-go/config"
9 "go.uber.org/zap"
10
11 "github.com/jimyag/shop/api/user/global"
12)
13
14func Tracing() gin.HandlerFunc {
15 return func(ctx *gin.Context) {
16 cfg := jaegercfg.Configuration{
17 Sampler: &jaegercfg.SamplerConfig{
18 Type: jaeger.SamplerTypeConst,
19 Param: 1, // 全部采样
20 },
21 Reporter: &jaegercfg.ReporterConfig{
22 LogSpans: true,
23 LocalAgentHostPort: fmt.Sprintf("%s:%d",
24 global.ServerConfig.JaegerInfo.Host, // jaeger 位置
25 global.ServerConfig.JaegerInfo.Port, // 6831
26 ),
27 },
28 ServiceName: global.ServerConfig.Name,
29 }
30 tracer, close, err := cfg.NewTracer(jaegercfg.Logger(jaeger.StdLogger))
31 if err != nil {
32 global.Logger.Fatal("创建 tracer 失败", zap.Error(err))
33 }
34 defer close.Close()
35 startSpan := tracer.StartSpan(ctx.Request.URL.Path)
36 defer startSpan.Finish()
37 ctx.Set("tracer", tracer)
38 ctx.Set("parentSpan", startSpan)
39 ctx.Next()
40 }
41}
将这个中间件配置到需要链路追踪的router上
shop\api\user-api\initialize\router.go
全局都加
1router.Use(middlewares.Tracing())
由于我们使用了负载均衡
,所以对于其他的grpc的链接要加一个拦截器,来将context加入到grpc服务中。
1package initialize
2
3import (
4 "fmt"
5
6 "github.com/hashicorp/consul/api"
7 _ "github.com/mbobakov/grpc-consul-resolver"
8 "github.com/opentracing/opentracing-go"
9 "go.uber.org/zap"
10 "google.golang.org/grpc"
11
12 "github.com/jimyag/shop/api/user/global"
13 "github.com/jimyag/shop/api/user/proto"
14 "github.com/jimyag/shop/api/user/util/otgrpc"
15)
16
17func InitSrvConn() {
18 // consul
19 conn, err := grpc.Dial(
20 fmt.Sprintf("consul://%s:%d/%s?wait=14s",
21 global.ServerConfig.ConsulInfo.Host,
22 global.ServerConfig.ConsulInfo.Port,
23 global.ServerConfig.UserSrv.Name,
24 ),
25 grpc.WithInsecure(),
26 grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
27 // 添加的
28 grpc.WithUnaryInterceptor(
29 otgrpc.OpenTracingClientInterceptor(
30 opentracing.GlobalTracer(),
31 ),
32 ),
33 // 结束
34 )
35 if err != nil {
36 global.Logger.Fatal("用户服务发现错误", zap.Error(err))
37 }
38 global.UserSrvClient = proto.NewUserClient(conn)
39
40}
shop\api\user-api\util\otgrpc\client.go:31
修改源码
1func OpenTracingClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryClientInterceptor {
2 otgrpcOpts := newOptions()
3 otgrpcOpts.apply(optFuncs...)
4 return func(
5 ctx context.Context,
6 method string,
7 req, resp interface{},
8 cc *grpc.ClientConn,
9 invoker grpc.UnaryInvoker,
10 opts ...grpc.CallOption,
11 ) error {
12 var err error
13 var parentCtx opentracing.SpanContext
14 // 从 context 提取 父span
15 if parent := opentracing.SpanFromContext(ctx); parent != nil {
16 parentCtx = parent.Context()
17 }
18 // 修改的
19 switch ctx.(type) {
20 case *gin.Context:
21 iTracer, ok := ctx.(*gin.Context).Get("tracer")
22 if ok {
23 tracer = iTracer.(opentracing.Tracer)
24 }
25
26 parentSpan, ok := ctx.(*gin.Context).Get("parentSpan")
27 if ok {
28 parentCtx = parentSpan.(*jaegerClient.Span).Context()
29 }
30
31 }
32
33 if otgrpcOpts.inclusionFunc != nil &&
34 !otgrpcOpts.inclusionFunc(parentCtx, method, req, resp) {
35 return invoker(ctx, method, req, resp, cc, opts...)
36 }
37 clientSpan := tracer.StartSpan(
38 method,
39 opentracing.ChildOf(parentCtx),
40 ext.SpanKindRPCClient,
41 gRPCComponentTag,
42 )
43 defer clientSpan.Finish()
44 // 使用metadata机制传递
45 ctx = injectSpanContext(ctx, tracer, clientSpan)
46 if otgrpcOpts.logPayloads {
47 clientSpan.LogFields(log.Object("gRPC request", req))
48 }
49 err = invoker(ctx, method, req, resp, cc, opts...)
50 if err == nil {
51 if otgrpcOpts.logPayloads {
52 clientSpan.LogFields(log.Object("gRPC response", resp))
53 }
54 } else {
55 SetSpanTags(clientSpan, err, true)
56 clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
57 }
58 if otgrpcOpts.decorator != nil {
59 otgrpcOpts.decorator(clientSpan, method, req, resp, err)
60 }
61 return err
62 }
63}
这里修改源码是拿到context中的tracer
和parentSpan
grpc集成jaeger
在服务端还有子的过程
client拦截器的原理
从context拿到父亲的span
1// 通过parentSpan生成当前的span
2clientSpan := tracer.StartSpan(
3 method,
4 opentracing.ChildOf(parentCtx),
5 ext.SpanKindRPCClient,
6 gRPCComponentTag,
7 )
8 defer clientSpan.Finish()
通过metadata的机制,将它的内容写到metadata中去
1// 使用metadata机制传递
2 ctx = injectSpanContext(ctx, tracer, clientSpan)
然后通过shop\api\user-api\util\otgrpc\client.go:243
1func injectSpanContext(ctx context.Context, tracer opentracing.Tracer, clientSpan opentracing.Span) context.Context {
2 md, ok := metadata.FromOutgoingContext(ctx)
3 if !ok {
4 md = metadata.New(nil)
5 } else {
6 md = md.Copy()
7 }
8 mdWriter := metadataReaderWriter{md}
9 // 将服务端想要的信息注入到metadata中
10 err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, mdWriter)
11 // We have no better place to record an error than the Span itself :-/
12 if err != nil {
13 clientSpan.LogFields(log.String("event", "Tracer.Inject() failed"), log.Error(err))
14 }
15 return metadata.NewOutgoingContext(ctx, md)
16}
如何写到opentracing中去这是有一个标准,是由opentracing做的,如何提取也是由它来做的。
将服务端想要的信息注入到metadata中去,如果注入、拿数据我们不用关心。
在grpc服务端
1// For example:
2//
3// s := grpc.NewServer(
4// ..., // (existing ServerOptions)
5// grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)))
只要在new grpcserver的时候添加一个服务端的拦截器就行
shop\service\user_srv\main.go
1// 初始化jaeger
2 cfg := jaegercfg.Configuration{
3 Sampler: &jaegercfg.SamplerConfig{
4 Type: jaeger.SamplerTypeConst,
5 Param: 1, // 全部采样
6 },
7 Reporter: &jaegercfg.ReporterConfig{
8 LogSpans: true,
9 LocalAgentHostPort: fmt.Sprintf("%s:%d",
10 global.RemoteConfig.JaegerInfo.Host,
11 global.RemoteConfig.JaegerInfo.Port,
12 ),
13 },
14 ServiceName: "user-srv",
15 }
16 // 初始化一jaeger
17 tracer, cl, err := cfg.NewTracer(jaegercfg.Logger(jaeger.StdLogger))
18 if err != nil {
19 global.Logger.Fatal("创建 tracer 失败", zap.Error(err))
20 }
21 opentracing.SetGlobalTracer(tracer)
22 // 注册服务
23 server := grpc.NewServer(grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)))
我们这边可以自己生成tracer,没有必要用服务端的tracer,我们只要处理好父子关系就好,当整个服务挂了之后cl.Close()
在grpc的服务中如何拿到tracer,
shop\service\user_srv\util\otgrpc\server.go:39
从context中拿到span
1spanContext, err := extractSpanContext(ctx, tracer)
1func extractSpanContext(ctx context.Context, tracer opentracing.Tracer) (opentracing.SpanContext, error) {
2 md, ok := metadata.FromIncomingContext(ctx)
3 if !ok {
4 md = metadata.New(nil)
5 }
6 // 与之前的Inject对应
7 return tracer.Extract(opentracing.HTTPHeaders, metadataReaderWriter{md})
8}
在服务中使用:
D:\repository\shop\service\user_srv\handler\user.go
1func (u *UserServer) GetUserList(ctx context.Context, req *proto.PageIngo) (*proto.UserListResponse, error) {
2 // 省略之前的
3 // 从context总拿到parentSpan
4 parentSpan := opentracing.SpanFromContext(ctx)
5 // 生成一个span并设置它的父亲
6 getUserListSpan := opentracing.GlobalTracer().StartSpan("get user list form database", opentracing.ChildOf(parentSpan.Context()))
7 users, err := u.Store.ListUsers(ctx, arg)
8 if err != nil {
9 return nil, status.Errorf(codes.Internal, "获得用户列表信息失败")
10 }
11 getUserListSpan.Finish()
12 // 追踪结束。
13 // 省略其他
14}