JustNote
  • Introduction
  • DesignPattern
    • 七大原则
      • 开闭原则(OCP)
      • 依赖倒置原则(DIP)
      • 单一职责原则(SRP)
      • 接口隔离原则(ISP)
      • 迪米特法则(LoD)
      • 里氏代换原则(LSP)
      • 合成复用原则(CRP)
    • 创建型模式
      • 简单工厂模式
      • 工厂方法模式
      • 抽象工厂模式
      • 建造者模式
      • 单例模式
      • 原型模式
    • 结构型模式
      • 外观模式
      • 装饰模式
      • 适配器模式
      • 享元模式
      • 组合模式
      • 桥接模式
      • 代理模式
    • 行为型模式
      • 模板方法模式
      • 迭代器模式
      • 策略模式
      • 解释器模式
      • 观察者模式
      • 备忘录模式
      • 命令模式
      • 中介者模式
      • 责任链模式
      • 访问者模式
      • 状态模式
  • Java
    • Java Core
      • JVM 如何加载类
      • JVM 垃圾回收
      • JVM G1GC
      • JVM G1GC Q&A
      • JVM 与 Hbase
      • JVM ZGC Overview
      • JVM ZGC 内存管理
      • JVM ZGC 线程
      • JVM ZGC 垃圾回收
      • JVM ZGC 日志分析
      • JVM ZGC 参数调优
    • checkstyle
  • Golang
    • 源码阅读
      • Goroutines
      • Channel
    • gRPC
      • 1、快速开始
      • 2、什么是gRPC
      • 3、gRPC概念梳理
      • 4、基于Golang的gRPC入门
      • 5、gRPC组件ProtocolBuffers介绍
      • 6、gRPC组件Http 2.0
      • 7、错误处理和Debug
      • 8、gRPC身份验证
      • 9、服务注册与发现
      • 10、gRPC与gRPC Gateway
      • 11、gRPC与分布式链路追踪
  • Scala
    • 数据结构与算法
      • 数组
      • 队列
    • 函数式编程
      • 高阶函数
      • 偏函数
    • Immutable Collection
      • List
    • Mutable Collection
      • Array
    • 常见函数操作
      • A
        • aggregate
        • andThen
        • appended
        • appendedAll
      • C
        • chain
        • collect
        • collectFirst
        • combinations
        • compose
        • concat
        • cond
        • condOpt
        • const
        • contains
        • containsSlice
        • copyToArray
        • corresponds
        • count
        • curried
      • D
        • diff
        • distinct
        • distinctBy
        • drop
        • dropRight
        • dropWhile
      • E
        • empty(PartialFunction)
        • empty(collections)
        • endsWith
        • exists
      • F
        • fill
        • filter
        • filterKeys
        • filterNot
        • find
        • findLast
        • flatMap
        • flatten
        • fold
        • foldLeft
        • foldRight
        • forall
        • foreach
        • fromFunction
      • G
        • getOrElse (Map)
        • getOrElse (Option)
        • groupBy
        • groupMap
        • groupMapReduce
        • grouped
      • H
        • head
        • headOption
      • I
        • indexOf
        • indexOfSlice
        • indexWhere
        • indices
        • init
        • inits
        • intersect
        • isDefinedAt (Map)
        • isDefinedAt (Seq)
        • isEmpty
        • isTraversableAgain
      • K
        • keys
      • L
        • last
        • lastIndexOf
        • lastIndexOfSlice
        • lastIndexWhere
        • lastOption
        • length
        • lift
      • M
        • map
        • mapConserve
        • mapValues
        • max
        • maxBy
        • maxByOption
        • maxOption
        • min
        • minBy
        • minByOption
        • minOption
        • mkString
      • N
        • nonEmpty
      • O
        • orElse
      • P
        • padTo
        • par
        • partition
        • partitionMap
        • patch
        • permutations
        • prefixLength
        • prepended
        • prependedAll
        • product
      • R
        • range
        • reduce
        • reduceLeft
        • reduceLeftOption
        • reduceOption
        • reduceRight
        • reduceRightOption
        • reverse
        • reverseIterator
        • reverseMap
        • runWith
      • S
        • sameElements
        • scan
        • scanLeft
        • scanRight
        • search
        • segmentLength
        • size
        • slice
        • sliding
        • sortBy
        • sortWith
        • sorted
        • span
        • splitAt
        • startsWith
        • sum
      • T
        • tabulate
        • tail
        • tails
        • take
        • takeRight
        • takeWhile
        • transpose
        • tupled
      • U
        • unfold
        • union
        • unlift
        • untupled
        • unzip
        • unzip3
        • updated
      • V
        • values
        • view
      • W
        • withFilter
        • withDefault
        • withDefaultValue
      • Z
        • zip
        • zipAll
        • zipWithIndex
      • map
      • flatmap
      • filter
      • reduceLeft
      • foldLeft
    • Futures
      • Method with future as return type
      • Non blocking future result
      • Chain futures using flatMap
      • Chain futures using for comprehension
      • Future Option with for comprehension
      • Future Option with map
      • Composing Futures
      • Future Sequence
      • Future Traverse
      • Future foldLeft
      • Future reduceLeft
      • Future firstCompletedOf
      • Future zip
      • Future zipWith
      • Future andThen
      • Future configure threadpool
      • Future recover
      • Future recoverWith
      • Future fallbackTo
      • Future promise
    • Akka
  • Algorithm
  • Docker
  • Kubernetes
    • 二进制安装kubernetes
      • 00.从零开始
  • Architecture
    • Infrastructure
      • Opentracing
      • Jaeger && ZipKin
      • SkyWalking
      • Consul
      • Envoy
      • Service Mesh
      • Service Mesh: Istio 详解
      • Service Mesh: 基于 Istio 的落地实践(一)
    • CAS
      • CAS Server
      • CAS Service Management
      • CAS 集成LDAP
      • CAS 集成gitlab
      • CAS SSO & SLO
      • CAS Gitbook
    • xCAT
  • Netty
  • DDD
  • Reactive Programming
    • Reactor
      • Publisher
      • Subscriber
      • Subscription
      • Processor
    • WebFlux
  • Gitlab
    • Git Hook
  • CICD
    • Jenkins
      • Kubectl
Powered by GitBook
On this page
  • 尝鲜一下
  • Setup MicroDonuts
  • 选择一个分布式追踪工具
  • 官方示例 Tracing Demo
  • gRPC 集成 Jaeger
  • 查看一下容器内的构成
  • tracing carrier
  • 创建 GlobalTracer
  • gRPC Interceptor
  • gRPC Client 和 gRPC Server 中集成 Interceptor 和Tracer
  • 运行一下
  • 参考

Was this helpful?

  1. Golang
  2. gRPC

11、gRPC与分布式链路追踪

Previous10、gRPC与gRPC GatewayNextScala

Last updated 5 years ago

Was this helpful?

传送门

传送门

在前面的两篇文章中,我们已经普及了什么是OpenTracing,以及Jaeger和ZipKin的简单比较。接下来,就来按照官方的例子进行一个简单的尝试,然后再抽丝剥茧,看看如何在我们的工程代码中进行实现。

Setup MicroDonuts

首先,测试环境需要安装JDK,以及Maven,可以搜索如何安装。

git clone git@github.com:opentracing-contrib/java-opentracing-walkthrough.git
cd java-opentracing-walkthrough/microdonuts
mvn package exec:exec

选择一个分布式追踪工具

我们这里选择的是Jaeger.

修改 microdonuts/tracer_config.properties 文件:

tracer=jaeger
jaeger.reporter_host=localhost
jaeger.reporter_port=5775

在docker中运行:

docker run -d -p 5775:5775/udp -p 16686:16686 jaegertracing/all-in-one:latest

打开浏览器,访问 http://localhost:16686 就可以看到 Jaeger UI 了。

官方示例 Tracing Demo

gRPC 集成 Jaeger

在前面的对比中,我们已经大体上介绍过Jaeger 和 ZipKin 的区别,同时我们也介绍了二者在现有云原生生态中的发展。所以,我们这里选择了Jaeger来进行学习。

下面我们就介绍一下,如何在gRPC中集成Jaeger。

从我们之前介绍的Jaeger文章中,我们看到如果想要对每个服务进行tracing,除了需要在代码里面嵌入Jaeger Client 之外,还需要有Agent,collector,store,UI 等众多组件。我们这里只演示如何在代码中集成JaegerClient,至于其他组件,我们都在docker中运行。

docker run -d -p6831:6831/udp -p16686:16686 jaegertracing/all-in-one:latest

这时在浏览器中访问 http://127.0.0.1:16686/search就可以看到jaeger的页面了。不过现在代码还没有跑起来,看不到什么效果。

查看一下容器内的构成

version: '2'

services:
    jaeger-collector:
      image: jaegertracing/jaeger-collector
      command: ["--cassandra.keyspace=jaeger_v1_dc1", "--cassandra.servers=cassandra", "--collector.zipkin.http-port=9411"]
      ports:
        - "14269"
        - "14268:14268"
        - "14267"
        - "14250"
        - "9411:9411"
      restart: on-failure
      depends_on:
        - cassandra-schema

    jaeger-query:
      image: jaegertracing/jaeger-query
      command: ["--cassandra.keyspace=jaeger_v1_dc1", "--cassandra.servers=cassandra"]
      ports:
        - "16686:16686"
        - "16687"
      restart: on-failure
      depends_on:
        - cassandra-schema

    jaeger-agent:
      image: jaegertracing/jaeger-agent
      command: ["--reporter.grpc.host-port=jaeger-collector:14250"]
      ports:
        - "5775:5775/udp"
        - "6831:6831/udp"
        - "6832:6832/udp"
        - "5778:5778"
      restart: on-failure
      depends_on:
        - jaeger-collector

    cassandra:
      image: cassandra:3.9

    cassandra-schema:
      image: jaegertracing/jaeger-cassandra-schema
      depends_on:
        - cassandra

即便对容器不是很熟悉的话,也能够看出,容器内运行的程序 包含了四个部分,分别是 agent,query,collector,还有存储cassandra。很明显,接下来,我们在 gRPC-example中,再集成 jaeger-client,然后就可以进行微服务的链路追踪了。

tracing carrier

// TextMapCarrier allows the use of regular map[string]string
// as both TextMapWriter and TextMapReader.
type TextMapCarrier map[string]string

// HTTPHeadersCarrier satisfies both TextMapWriter and TextMapReader.
//
// Example usage for server side:
//
//     carrier := opentracing.HTTPHeadersCarrier(httpReq.Header)
//     clientContext, err := tracer.Extract(opentracing.HTTPHeaders, carrier)
//
// Example usage for client side:
//
//     carrier := opentracing.HTTPHeadersCarrier(httpReq.Header)
//     err := tracer.Inject(
//         span.Context(),
//         opentracing.HTTPHeaders,
//         carrier)
//
type HTTPHeadersCarrier http.Header
// TextMapWriter is the Inject() carrier for the TextMap builtin format. With
// it, the caller can encode a SpanContext for propagation as entries in a map
// of unicode strings.
type TextMapWriter interface {
    // Set a key:value pair to the carrier. Multiple calls to Set() for the
    // same key leads to undefined behavior.
    //
    // NOTE: The backing store for the TextMapWriter may contain data unrelated
    // to SpanContext. As such, Inject() and Extract() implementations that
    // call the TextMapWriter and TextMapReader interfaces must agree on a
    // prefix or other convention to distinguish their own key:value pairs.
    Set(key, val string)
}

// TextMapReader is the Extract() carrier for the TextMap builtin format. With it,
// the caller can decode a propagated SpanContext as entries in a map of
// unicode strings.
type TextMapReader interface {
    // ForeachKey returns TextMap contents via repeated calls to the `handler`
    // function. If any call to `handler` returns a non-nil error, ForeachKey
    // terminates and returns that error.
    //
    // NOTE: The backing store for the TextMapReader may contain data unrelated
    // to SpanContext. As such, Inject() and Extract() implementations that
    // call the TextMapWriter and TextMapReader interfaces must agree on a
    // prefix or other convention to distinguish their own key:value pairs.
    //
    // The "foreach" callback pattern reduces unnecessary copying in some cases
    // and also allows implementations to hold locks while the map is read.
    ForeachKey(handler func(key, val string) error) error
}

我们接下来来实现一下两个接口,采用一个自定义的carrier。

// MDCarrier custome carrier
type MDCarrier struct {
    metadata.MD
}

// ForeachKey conforms to the TextMapReader interface.
// 这里必须要实现这个 TextMapReader 这个接口
// TextMapReader is the Extract() carrier for the TextMap builtin format. With it,
// the caller can decode a propagated SpanContext as entries in a map of
// unicode strings.
//type TextMapReader interface {
//    // ForeachKey returns TextMap contents via repeated calls to the `handler`
//    // function. If any call to `handler` returns a non-nil error, ForeachKey
//    // terminates and returns that error.
//    //
//    // NOTE: The backing store for the TextMapReader may contain data unrelated
//    // to SpanContext. As such, Inject() and Extract() implementations that
//    // call the TextMapWriter and TextMapReader interfaces must agree on a
//    // prefix or other convention to distinguish their own key:value pairs.
//    //
//    // The "foreach" callback pattern reduces unnecessary copying in some cases
//    // and also allows implementations to hold locks while the map is read.
//    ForeachKey(handler func(key, val string) error) error
//}
func (m MDCarrier) ForeachKey(handler func(key, val string) error) error {
    for k, strs := range m.MD {
        for _, v := range strs {
            if err := handler(k, v); err != nil {
                return err
            }
        }
    }
    return nil
}

// Set implements Set() of opentracing.TextMapWriter
// 这里也必须要实现
// TextMapWriter is the Inject() carrier for the TextMap builtin format. With
// it, the caller can encode a SpanContext for propagation as entries in a map
// of unicode strings.
//type TextMapWriter interface {
//    // Set a key:value pair to the carrier. Multiple calls to Set() for the
//    // same key leads to undefined behavior.
//    //
//    // NOTE: The backing store for the TextMapWriter may contain data unrelated
//    // to SpanContext. As such, Inject() and Extract() implementations that
//    // call the TextMapWriter and TextMapReader interfaces must agree on a
//    // prefix or other convention to distinguish their own key:value pairs.
//    Set(key, val string)
//}
func (m MDCarrier) Set(key, val string) {
    m.MD[key] = append(m.MD[key], val)
}

创建 GlobalTracer

// NewJaegerTracer NewJaegerTracer for current service
func NewJaegerTracer(serviceName string, jagentHost string) (tracer opentracing.Tracer, closer io.Closer, err error) {
    cfg := jaegercfg.Configuration{
        ServiceName: serviceName,
        Sampler: &jaegercfg.SamplerConfig{
            Type:  jaeger.SamplerTypeConst,
            Param: 1,
        },
        Reporter: &jaegercfg.ReporterConfig{
            LogSpans:            true,
            BufferFlushInterval: 1 * time.Second,
            LocalAgentHostPort:  jagentHost,
        },
    }
    // Example logger and metrics factory. Use github.com/uber/jaeger-client-go/log
    // and github.com/uber/jaeger-lib/metrics respectively to bind to real logging and metrics
    // frameworks.
    jLogger := jaegerlog.StdLogger
    jMetricsFactory := metrics.NullFactory

    // Initialize tracer with a logger and a metrics factory
    tracer, closer, err = cfg.NewTracer(
        jaegercfg.Logger(jLogger),
        jaegercfg.Metrics(jMetricsFactory))

    opentracing.SetGlobalTracer(tracer)
    if err != nil {
        grpclog.Errorf("Could not initialize jaeger tracer: %s", err.Error())
        return
    }
    return
}

gRPC Interceptor

gRPC 提供了 拦截器,让我们可以在Clent端和server端对方法进行拦截处理,这样可以节省我们很大的麻烦。因为我们如果在server端和client端分别有很多的方法需要监控,难道我们每个方法都要去实现一遍 tracer定义?interceptor帮助我们解决了这个问题。

下面我们看下如何定义的Client和Server Interceptor.

// ClientInterceptor 客户端拦截器
// https://godoc.org/google.golang.org/grpc#UnaryClientInterceptor
func ClientInterceptor(tracer opentracing.Tracer) grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, request, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {

        //一个RPC调用的服务端的span,和RPC服务客户端的span构成ChildOf关系
        var parentCtx opentracing.SpanContext
        parentSpan := opentracing.SpanFromContext(ctx)
        if parentSpan != nil {
            parentCtx = parentSpan.Context()
        }
        span := tracer.StartSpan(
            method,
            opentracing.ChildOf(parentCtx),
            opentracing.Tag{Key: string(ext.Component), Value: "gRPC Client"},
            ext.SpanKindRPCClient,
        )

        defer span.Finish()
        md, ok := metadata.FromOutgoingContext(ctx)
        if !ok {
            md = metadata.New(nil)
        } else {
            md = md.Copy()
        }

        err := tracer.Inject(
            span.Context(),
            opentracing.TextMap,
            MDCarrier{md}, // 自定义 carrier
        )

        if err != nil {
            log.Errorf("inject span error :%v", err.Error())
        }

        newCtx := metadata.NewOutgoingContext(ctx, md)
        err = invoker(newCtx, method, request, reply, cc, opts...)

        if err != nil {
            log.Errorf("call error : %v", err.Error())
        }
        return err
    }
}


// ServerInterceptor Server 端的拦截器
func ServerInterceptor(tracer opentracing.Tracer) grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
            md = metadata.New(nil)
        }
        spanContext, err := tracer.Extract(
            opentracing.TextMap,
            MDCarrier{md},
        )

        if err != nil && err != opentracing.ErrSpanContextNotFound {
            grpclog.Errorf("extract from metadata err: %v", err)
        } else {
            span := tracer.StartSpan(
                info.FullMethod,
                ext.RPCServerOption(spanContext),
                opentracing.Tag{Key: string(ext.Component), Value: "gRPC Server"},
                ext.SpanKindRPCServer,
            )
            defer span.Finish()

            ctx = opentracing.ContextWithSpan(ctx, span)
        }

        return handler(ctx, req)

    }

}

gRPC Client 和 gRPC Server 中集成 Interceptor 和Tracer

接下来,就可以在gRPC的代码中进行tracer 和 interceptor的集成了。

我们还是使用 前面 gRPC 注册consul 的例子,在这个例子的基础上,添加 tracer。

首先我们来看下 Clinet端

func main() {
    consul.Init()

    tracer, closer, err := intercepter.NewJaegerTracer(serviceName, jaegerAgent)
    defer closer.Close()
    if err != nil {
        log.Printf("NewJaegerTracer err:", err.Error())
    }
    // Set up a connection to the server.
    ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
    conn, err := grpc.DialContext(ctx, consulService, grpc.WithInsecure(), grpc.WithBalancerName("round_robin"), grpc.WithUnaryInterceptor(intercepter.ClientInterceptor(tracer)))
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()
    c := pb.NewGopherClient(conn)

    ........
}

然后我们看下 Server 端

func main() {

    tracer, closer, err := intercepter.NewJaegerTracer(serviceName, jaegerAgent)
    defer closer.Close()
    if err != nil {
        log.Printf("NewJaegerTracer err: %v", err.Error())
    }
    lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer(grpc.UnaryInterceptor(intercepter.ServerInterceptor(tracer)))
    pb.RegisterGopherServer(s, &server{})
    grpc_health_v1.RegisterHealthServer(s, &HealthImpl{})
    RegisterToConsul()
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

从上面的代码来看,我们的实现,非常简单。只要在client和server端启动时将我们的interceptor传入,同时传入创建好的trace就可以了。

但是有一点需要注意,因为我们使用的是interceptor,所以,在进行健康性检查的时候,也会被trace到。也就是说,我们在 jaeger UI 上查看时也能够看到 health check 的trace 信息。

运行一下

先运行 Server端,再启动Client端,就可以进行通信以及链路追踪了。

点开之后可以很详细的看到层级关系以及每个方法的信息,需要tracing的信息,还可以进行更详细的定义。

还可以按照 调用链 的形式来进行查看。这有利于我们梳理清复杂的服务架构。

参考

在浏览器中访问 就可以看到一个简单的页面了。

可以点击 去查看一下这个容器内的构成。 下面把代码贴一下。

根据 Opentracing 的官方定义在进行,跨进程追踪调用的时候,需要进行 。并且需要指定carrier。

而官方指定的carrier 只有两种。

当然,我们也可以进行 自定义的carrier,但是如果要实现自定义的carrier,就必须要实现 接口

创建 tracer 的过程可以参考 官方的demo示例

可以参考gRPC 官方

本文的示例 代码地址

分布式链路追踪
尝鲜一下
Setup MicroDonuts
选择一个分布式追踪工具
官方示例 Tracing Demo
gRPC 集成 Jaeger
查看一下容器内的构成
tracing carrier
创建 GlobalTracer
gRPC Interceptor
gRPC Client 和 gRPC Server 中集成 Interceptor 和Tracer
运行一下
参考
Opentracing
Jayger && ZipKin
尝鲜一下
官方DEMO
http://127.0.0.1:10001
Take OpenTracing for a HotROD ride
jaeger-docker-compose
Inject and Extract
TextMapCarrier 和 HTTPHeadersCarrier
TextMapWriter & TextMapReader
jaeger-client-go
Example Interceptor
rpc-examples
grpc-jaeger
jaeger client libraries
opentracing-tutorial
Tracing HTTP request latency in Go with OpenTracing
cross process tracing
Lesson 3 - Tracing RPC Requests
Opentracing Inject Extract
tracing demo
Jaeger UI
Jaeger UI
gRPC Tracing
gRPC Tracing
gRPC Tracing