11、gRPC与分布式链路追踪
传送门 Opentracing
传送门 Jayger && ZipKin
在前面的两篇文章中,我们已经普及了什么是OpenTracing,以及Jaeger和ZipKin的简单比较。接下来,就来按照官方的例子进行一个简单的尝试,然后再抽丝剥茧,看看如何在我们的工程代码中进行实现。
Setup MicroDonuts
首先,测试环境需要安装JDK,以及Maven,可以搜索如何安装。
git clone [email protected]:opentracing-contrib/java-opentracing-walkthrough.git
cd java-opentracing-walkthrough/microdonuts
mvn package exec:exec
在浏览器中访问 http://127.0.0.1:10001 就可以看到一个简单的页面了。

选择一个分布式追踪工具
我们这里选择的是Jaeger.
修改 microdonuts/tracer_config.properties
文件:
tracer=jaeger
jaeger.reporter_host=localhost
jaeger.reporter_port=5775
在docker中运行:
docker run -d -p 5775:5775/udp -p 16686:16686 jaegertracing/all-in-one:latest
打开浏览器,访问 http://localhost:16686
就可以看到 Jaeger UI 了。

官方示例 Tracing Demo
Take OpenTracing for a HotROD ride
gRPC 集成 Jaeger
在前面的对比中,我们已经大体上介绍过Jaeger 和 ZipKin 的区别,同时我们也介绍了二者在现有云原生生态中的发展。所以,我们这里选择了Jaeger来进行学习。
下面我们就介绍一下,如何在gRPC中集成Jaeger。
从我们之前介绍的Jaeger文章中,我们看到如果想要对每个服务进行tracing,除了需要在代码里面嵌入Jaeger Client 之外,还需要有Agent,collector,store,UI 等众多组件。我们这里只演示如何在代码中集成JaegerClient,至于其他组件,我们都在docker中运行。
docker run -d -p6831:6831/udp -p16686:16686 jaegertracing/all-in-one:latest
这时在浏览器中访问 http://127.0.0.1:16686/search
就可以看到jaeger的页面了。不过现在代码还没有跑起来,看不到什么效果。

查看一下容器内的构成
可以点击 jaeger-docker-compose 去查看一下这个容器内的构成。 下面把代码贴一下。
version: '2'
services:
jaeger-collector:
image: jaegertracing/jaeger-collector
command: ["--cassandra.keyspace=jaeger_v1_dc1", "--cassandra.servers=cassandra", "--collector.zipkin.http-port=9411"]
ports:
- "14269"
- "14268:14268"
- "14267"
- "14250"
- "9411:9411"
restart: on-failure
depends_on:
- cassandra-schema
jaeger-query:
image: jaegertracing/jaeger-query
command: ["--cassandra.keyspace=jaeger_v1_dc1", "--cassandra.servers=cassandra"]
ports:
- "16686:16686"
- "16687"
restart: on-failure
depends_on:
- cassandra-schema
jaeger-agent:
image: jaegertracing/jaeger-agent
command: ["--reporter.grpc.host-port=jaeger-collector:14250"]
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
restart: on-failure
depends_on:
- jaeger-collector
cassandra:
image: cassandra:3.9
cassandra-schema:
image: jaegertracing/jaeger-cassandra-schema
depends_on:
- cassandra
即便对容器不是很熟悉的话,也能够看出,容器内运行的程序 包含了四个部分,分别是 agent
,query
,collector
,还有存储cassandra
。很明显,接下来,我们在 gRPC-example中,再集成 jaeger-client
,然后就可以进行微服务的链路追踪了。
tracing carrier
根据 Opentracing 的官方定义在进行,跨进程追踪调用的时候,需要进行 Inject and Extract。并且需要指定carrier。
而官方指定的carrier 只有两种。 TextMapCarrier 和 HTTPHeadersCarrier
// TextMapCarrier allows the use of regular map[string]string
// as both TextMapWriter and TextMapReader.
type TextMapCarrier map[string]string
// HTTPHeadersCarrier satisfies both TextMapWriter and TextMapReader.
//
// Example usage for server side:
//
// carrier := opentracing.HTTPHeadersCarrier(httpReq.Header)
// clientContext, err := tracer.Extract(opentracing.HTTPHeaders, carrier)
//
// Example usage for client side:
//
// carrier := opentracing.HTTPHeadersCarrier(httpReq.Header)
// err := tracer.Inject(
// span.Context(),
// opentracing.HTTPHeaders,
// carrier)
//
type HTTPHeadersCarrier http.Header
当然,我们也可以进行 自定义的carrier,但是如果要实现自定义的carrier,就必须要实现 TextMapWriter & TextMapReader 接口
// TextMapWriter is the Inject() carrier for the TextMap builtin format. With
// it, the caller can encode a SpanContext for propagation as entries in a map
// of unicode strings.
type TextMapWriter interface {
// Set a key:value pair to the carrier. Multiple calls to Set() for the
// same key leads to undefined behavior.
//
// NOTE: The backing store for the TextMapWriter may contain data unrelated
// to SpanContext. As such, Inject() and Extract() implementations that
// call the TextMapWriter and TextMapReader interfaces must agree on a
// prefix or other convention to distinguish their own key:value pairs.
Set(key, val string)
}
// TextMapReader is the Extract() carrier for the TextMap builtin format. With it,
// the caller can decode a propagated SpanContext as entries in a map of
// unicode strings.
type TextMapReader interface {
// ForeachKey returns TextMap contents via repeated calls to the `handler`
// function. If any call to `handler` returns a non-nil error, ForeachKey
// terminates and returns that error.
//
// NOTE: The backing store for the TextMapReader may contain data unrelated
// to SpanContext. As such, Inject() and Extract() implementations that
// call the TextMapWriter and TextMapReader interfaces must agree on a
// prefix or other convention to distinguish their own key:value pairs.
//
// The "foreach" callback pattern reduces unnecessary copying in some cases
// and also allows implementations to hold locks while the map is read.
ForeachKey(handler func(key, val string) error) error
}
我们接下来来实现一下两个接口,采用一个自定义的carrier。
// MDCarrier custome carrier
type MDCarrier struct {
metadata.MD
}
// ForeachKey conforms to the TextMapReader interface.
// 这里必须要实现这个 TextMapReader 这个接口
// TextMapReader is the Extract() carrier for the TextMap builtin format. With it,
// the caller can decode a propagated SpanContext as entries in a map of
// unicode strings.
//type TextMapReader interface {
// // ForeachKey returns TextMap contents via repeated calls to the `handler`
// // function. If any call to `handler` returns a non-nil error, ForeachKey
// // terminates and returns that error.
// //
// // NOTE: The backing store for the TextMapReader may contain data unrelated
// // to SpanContext. As such, Inject() and Extract() implementations that
// // call the TextMapWriter and TextMapReader interfaces must agree on a
// // prefix or other convention to distinguish their own key:value pairs.
// //
// // The "foreach" callback pattern reduces unnecessary copying in some cases
// // and also allows implementations to hold locks while the map is read.
// ForeachKey(handler func(key, val string) error) error
//}
func (m MDCarrier) ForeachKey(handler func(key, val string) error) error {
for k, strs := range m.MD {
for _, v := range strs {
if err := handler(k, v); err != nil {
return err
}
}
}
return nil
}
// Set implements Set() of opentracing.TextMapWriter
// 这里也必须要实现
// TextMapWriter is the Inject() carrier for the TextMap builtin format. With
// it, the caller can encode a SpanContext for propagation as entries in a map
// of unicode strings.
//type TextMapWriter interface {
// // Set a key:value pair to the carrier. Multiple calls to Set() for the
// // same key leads to undefined behavior.
// //
// // NOTE: The backing store for the TextMapWriter may contain data unrelated
// // to SpanContext. As such, Inject() and Extract() implementations that
// // call the TextMapWriter and TextMapReader interfaces must agree on a
// // prefix or other convention to distinguish their own key:value pairs.
// Set(key, val string)
//}
func (m MDCarrier) Set(key, val string) {
m.MD[key] = append(m.MD[key], val)
}
创建 GlobalTracer
创建 tracer 的过程可以参考 官方的demo示例 jaeger-client-go
// NewJaegerTracer NewJaegerTracer for current service
func NewJaegerTracer(serviceName string, jagentHost string) (tracer opentracing.Tracer, closer io.Closer, err error) {
cfg := jaegercfg.Configuration{
ServiceName: serviceName,
Sampler: &jaegercfg.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
BufferFlushInterval: 1 * time.Second,
LocalAgentHostPort: jagentHost,
},
}
// Example logger and metrics factory. Use github.com/uber/jaeger-client-go/log
// and github.com/uber/jaeger-lib/metrics respectively to bind to real logging and metrics
// frameworks.
jLogger := jaegerlog.StdLogger
jMetricsFactory := metrics.NullFactory
// Initialize tracer with a logger and a metrics factory
tracer, closer, err = cfg.NewTracer(
jaegercfg.Logger(jLogger),
jaegercfg.Metrics(jMetricsFactory))
opentracing.SetGlobalTracer(tracer)
if err != nil {
grpclog.Errorf("Could not initialize jaeger tracer: %s", err.Error())
return
}
return
}
gRPC Interceptor
gRPC 提供了 拦截器,让我们可以在Clent端和server端对方法进行拦截处理,这样可以节省我们很大的麻烦。因为我们如果在server端和client端分别有很多的方法需要监控,难道我们每个方法都要去实现一遍 tracer定义?interceptor帮助我们解决了这个问题。
可以参考gRPC 官方 Example Interceptor
下面我们看下如何定义的Client和Server Interceptor.
// ClientInterceptor 客户端拦截器
// https://godoc.org/google.golang.org/grpc#UnaryClientInterceptor
func ClientInterceptor(tracer opentracing.Tracer) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, request, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
//一个RPC调用的服务端的span,和RPC服务客户端的span构成ChildOf关系
var parentCtx opentracing.SpanContext
parentSpan := opentracing.SpanFromContext(ctx)
if parentSpan != nil {
parentCtx = parentSpan.Context()
}
span := tracer.StartSpan(
method,
opentracing.ChildOf(parentCtx),
opentracing.Tag{Key: string(ext.Component), Value: "gRPC Client"},
ext.SpanKindRPCClient,
)
defer span.Finish()
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.New(nil)
} else {
md = md.Copy()
}
err := tracer.Inject(
span.Context(),
opentracing.TextMap,
MDCarrier{md}, // 自定义 carrier
)
if err != nil {
log.Errorf("inject span error :%v", err.Error())
}
newCtx := metadata.NewOutgoingContext(ctx, md)
err = invoker(newCtx, method, request, reply, cc, opts...)
if err != nil {
log.Errorf("call error : %v", err.Error())
}
return err
}
}
// ServerInterceptor Server 端的拦截器
func ServerInterceptor(tracer opentracing.Tracer) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.New(nil)
}
spanContext, err := tracer.Extract(
opentracing.TextMap,
MDCarrier{md},
)
if err != nil && err != opentracing.ErrSpanContextNotFound {
grpclog.Errorf("extract from metadata err: %v", err)
} else {
span := tracer.StartSpan(
info.FullMethod,
ext.RPCServerOption(spanContext),
opentracing.Tag{Key: string(ext.Component), Value: "gRPC Server"},
ext.SpanKindRPCServer,
)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
}
return handler(ctx, req)
}
}
gRPC Client 和 gRPC Server 中集成 Interceptor 和Tracer
接下来,就可以在gRPC的代码中进行tracer 和 interceptor的集成了。
我们还是使用 前面 gRPC 注册consul 的例子,在这个例子的基础上,添加 tracer。
首先我们来看下 Clinet端
func main() {
consul.Init()
tracer, closer, err := intercepter.NewJaegerTracer(serviceName, jaegerAgent)
defer closer.Close()
if err != nil {
log.Printf("NewJaegerTracer err:", err.Error())
}
// Set up a connection to the server.
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
conn, err := grpc.DialContext(ctx, consulService, grpc.WithInsecure(), grpc.WithBalancerName("round_robin"), grpc.WithUnaryInterceptor(intercepter.ClientInterceptor(tracer)))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGopherClient(conn)
........
}
然后我们看下 Server 端
func main() {
tracer, closer, err := intercepter.NewJaegerTracer(serviceName, jaegerAgent)
defer closer.Close()
if err != nil {
log.Printf("NewJaegerTracer err: %v", err.Error())
}
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer(grpc.UnaryInterceptor(intercepter.ServerInterceptor(tracer)))
pb.RegisterGopherServer(s, &server{})
grpc_health_v1.RegisterHealthServer(s, &HealthImpl{})
RegisterToConsul()
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
从上面的代码来看,我们的实现,非常简单。只要在client和server端启动时将我们的interceptor传入,同时传入创建好的trace就可以了。
但是有一点需要注意,因为我们使用的是interceptor,所以,在进行健康性检查的时候,也会被trace到。也就是说,我们在 jaeger UI 上查看时也能够看到 health check 的trace 信息。
运行一下
先运行 Server端,再启动Client端,就可以进行通信以及链路追踪了。

点开之后可以很详细的看到层级关系以及每个方法的信息,需要tracing的信息,还可以进行更详细的定义。

还可以按照 调用链 的形式来进行查看。这有利于我们梳理清复杂的服务架构。

本文的示例 代码地址 rpc-examples
参考
Last updated
Was this helpful?