tuleisM >>= Random.nextBlog
Posted 01 Nov 2021 by Linh Nguyen

OpenTelemetry Distributed Tracing with ZIO

Introduction

This post is some quick notes on using ZIO and zio-telemetry to implement OpenTelemetry distributed tracing for Scala applications. The source code is available here.

This is not an introduction to any of these technologies, but here are a few good reads:

Initial implementation

For demonstration purpose, we will perform manual instrumentation on a modified version of the zio-grpc's helloworld example, in which we incorporate both gRPC and HTTP communications:

  • Original: hello-client sends a HelloRequest with name x and hello-server returns a HelloResponse with message Hello, x.
  • Modified: in addition to the original behavior, client sends an optional integer field guess and server performs an HTTP request to HTTPBin based on its value.

Initial Diagram

Add the new flag guess:

helloworld.proto
// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
  google.protobuf.Int32Value guess = 2;}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

Add zio-grpc dependency

project/plugins.sbt
resolvers += Resolver.sonatypeRepo("snapshots")

addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.3")

addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.4")

val zioGrpcVersion = "0.5.1+12-93cdbe22-SNAPSHOT"

libraryDependencies ++= Seq(
  "com.thesamet.scalapb.zio-grpc" %% "zio-grpc-codegen" % zioGrpcVersion,
  "com.thesamet.scalapb"          %% "compilerplugin"   % "0.11.5"
)

Set up build.sbt

  • Generate Scala code from helloworld.proto.
  • Depend on sttp for HTTP client.
build.sbt
val grpcVersion = "1.41.0"
val sttpVersion = "3.3.15"

val scalaPBRuntime = "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion

val grpcRuntimeDeps = Seq(
  "io.grpc"      % "grpc-netty" % grpcVersion,
  scalaPBRuntime,
  scalaPBRuntime % "protobuf"
)

val sttpZioDeps = Seq(  "com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % sttpVersion)
lazy val root = Project("opentelemetry-distributed-tracing-zio", file(".")).aggregate(zio)

lazy val zio = commonProject("zio").settings(
  Compile / PB.targets := Seq(    scalapb.gen(grpc = true)          -> (Compile / sourceManaged).value,    scalapb.zio_grpc.ZioCodeGenerator -> (Compile / sourceManaged).value  ),  libraryDependencies ++= grpcRuntimeDeps ++ sttpZioDeps
)

Client implementation

  • Create a gRPC client pointing to localhost:9000.
  • Send a single HelloRequest.
  • Send 5 HelloRequests in parallel.
  • Send a single HelloRequest with an invalid guess.
  • Print "Done" and exit.
ZClient.scala
object ZClient extends zio.App {
  private val clientLayer = GreeterClient.live(
    ZManagedChannel(
      ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext()
    )
  )

  private val singleHello = GreeterClient.sayHello(HelloRequest("World"))

  private val multipleHellos = ZIO.collectAllParN(5)(
    List(
      GreeterClient.sayHello(HelloRequest("1", Some(1))),
      GreeterClient.sayHello(HelloRequest("2", Some(2))),
      GreeterClient.sayHello(HelloRequest("3", Some(3))),
      GreeterClient.sayHello(HelloRequest("4", Some(4))),
      GreeterClient.sayHello(HelloRequest("5", Some(5)))
    )
  )

  private val invalidHello = GreeterClient.sayHello(HelloRequest("Invalid", Some(-1))).ignore

  private def myAppLogic =
    singleHello *> multipleHellos *> invalidHello *> putStrLn("Done")

  def run(args: List[String]): URIO[ZEnv, ExitCode] =
    myAppLogic.provideCustomLayer(clientLayer).exitCode
}

Server implementation

  • Fail the request if guess is less than 0.
  • Based on the value of guess, delay for some time and then send a request to HTTPBin.
type ZGreeterEnv = Clock with Random with SttpClient
ZGreeterImpl.scala
object ZGreeterImpl extends RGreeter[ZGreeterEnv] {

  def sayHello(request: HelloRequest): ZIO[ZGreeterEnv, Status, HelloReply] = {
    val guess = request.guess.getOrElse(0)
    for {
      _      <- ZIO.fail(Status.INVALID_ARGUMENT).when(guess < 0)
      code   <- ???
      delayMs = ???
      _      <- httpRequest(code)
                  .delay(delayMs.millis)
                  .mapError(ex => Status.INTERNAL.withCause(ex))
    } yield HelloReply(s"Hello, ${request.name}")
  }

  def httpRequest(code: Int): RIO[SttpClient, Unit] =
    send(basicRequest.get(uri"https://httpbin.org/status/$code")).unit
}

Run it

  • To run the server:
$ sbt "zio/runMain com.github.tuleism.ZServer"

[info] running (fork) com.github.tuleism.ZServer
[info] Server is running. Press Ctrl-C to stop.
  • To run the client:
$ sbt "zio/runMain com.github.tuleism.ZClient"

[info] running (fork) com.github.tuleism.ZClient
[info] Done
[success] Total time: 12 s

At this point, we only know that it takes roughly 12 seconds for the client to initialize and finish its work.

Let's add distributed tracing to gain more insights into this.

Common tracing requirements

  • For both client and server, we need to acquire a Tracer, an object responsible for creating and managing Spans.
  • Tracing data is sent to Jaeger, which acts as a standalone collector.

Instrumented Diagram

Add new dependencies

build.sbt
val openTelemetryVersion = "1.6.0"
val zioConfigVersion     = "1.0.10"
val zioMagicVersion      = "0.3.9"
val zioTelemetryVersion  = "0.8.2"

val openTelemetryDeps = Seq(
  "io.opentelemetry" % "opentelemetry-exporter-jaeger"    % openTelemetryVersion,
  "io.opentelemetry" % "opentelemetry-sdk"                % openTelemetryVersion,
  "io.opentelemetry" % "opentelemetry-extension-noop-api" % s"$openTelemetryVersion-alpha"
)

val zioConfigDeps = Seq(
  "dev.zio" %% "zio-config"          % zioConfigVersion,
  "dev.zio" %% "zio-config-magnolia" % zioConfigVersion,
  "dev.zio" %% "zio-config-typesafe" % zioConfigVersion
)

val zioMagicDeps = Seq(
  "io.github.kitlangton" %% "zio-magic" % zioMagicVersion
)

val zioTelemetryDeps = Seq(
  "dev.zio"                       %% "zio-opentelemetry"                   % zioTelemetryVersion,
  "com.softwaremill.sttp.client3" %% "zio-telemetry-opentelemetry-backend" % sttpVersion
)

Add a config layer

application.conf
tracing {
  enable = false
  enable = ${?TRACING_ENABLE}
  endpoint = "http://127.0.0.1:14250"
  endpoint = ${?JAEGER_ENDPOINT}
}
TracingConfig.scala
case class AppConfig(tracing: TracingConfig)

case class TracingConfig(enable: Boolean, endpoint: String)

object AppConfig {
  private val configDescriptor = descriptor[AppConfig]

  val live: Layer[ReadError[String], Has[AppConfig]] = TypesafeConfig.fromDefaultLoader(configDescriptor)
}

Add a Tracer layer

  • Depend on the configuration, we either create a noop Tracer or one that sends data to Jaeger.
  • Once we have it, we can construct a Tracing layer, which give us access to many useful operations in zio-telemetry.
ZTracer.scala
object ZTracer {
  private val InstrumentationName = "com.github.tuleism"

  private def managed(serviceName: String, endpoint: String) = {
    val resource = Resource.builder().put(ResourceAttributes.SERVICE_NAME, serviceName).build()
    for {
      spanExporter   <- ZManaged.fromAutoCloseable(
                          Task(JaegerGrpcSpanExporter.builder().setEndpoint(endpoint).build())
                        )
      spanProcessor  <- ZManaged.fromAutoCloseable(UIO(SimpleSpanProcessor.create(spanExporter)))
      tracerProvider <- UIO(
                          SdkTracerProvider.builder().addSpanProcessor(spanProcessor).setResource(resource).build()
                        ).toManaged_
      openTelemetry  <- UIO(OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build()).toManaged_
      tracer         <- UIO(openTelemetry.getTracer(InstrumentationName)).toManaged_
    } yield tracer
  }

  def live(serviceName: String): RLayer[Has[TracingConfig], Has[Tracer]] =
    (
      for {
        config <- ZIO.service[TracingConfig].toManaged_
        tracer <- if (!config.enable) {                    Task(NoopOpenTelemetry.getInstance().getTracer(InstrumentationName)).toManaged_                  } else {                    managed(serviceName, config.endpoint)                  }      } yield tracer
    ).toLayer
}

New server

Instrument the HTTP client

SttpTracing.scala
object SttpTracing {
  private val wrapper = new ZioTelemetryOpenTelemetryTracer {
    def before[T](request: Request[T, Nothing]): RIO[Tracing, Unit] =
      Tracing.setAttribute(SemanticAttributes.HTTP_METHOD.getKey, request.method.method) *>
        Tracing.setAttribute(SemanticAttributes.HTTP_URL.getKey, request.uri.toString()) *>
        ZIO.unit

    def after[T](response: Response[T]): RIO[Tracing, Unit] =
      Tracing.setAttribute(SemanticAttributes.HTTP_STATUS_CODE.getKey, response.code.code) *>
        ZIO.unit
  }

  val live = AsyncHttpClientZioBackend.layer().flatMap { hasBackend =>
    ZIO
      .service[Tracing.Service]
      .map { tracing =>
        ZioTelemetryOpenTelemetryBackend(hasBackend.get, tracing, wrapper)
      }
      .toLayer
  }
}

Instrument the gRPC server

We can add Tracing without changing our server implementation with a ZTransform. For each request:

GrpcTracing.scala
object GrpcTracing {
  private val propagator: TextMapPropagator = W3CTraceContextPropagator.getInstance()
  private val metadataGetter: TextMapGetter[Metadata] = new TextMapGetter[Metadata] {    override def keys(carrier: Metadata): java.lang.Iterable[String] =
      carrier.keys()

    override def get(carrier: Metadata, key: String): String =
      carrier.get(
        Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)
      )
  }

  private def withSemanticAttributes[R, A](effect: ZIO[R, Status, A]): ZIO[Tracing with R, Status, A] =
    Tracing.setAttribute(SemanticAttributes.RPC_SYSTEM.getKey, "grpc") *>      effect
        .tapBoth(
          status =>            Tracing.setAttribute(              SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey,              status.getCode.value()            ),          _ =>            Tracing.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey, Status.OK.getCode.value())        )

  def serverTracingTransform[R]: ZTransform[R, Status, R with Tracing with Has[RequestContext]] =
    new ZTransform[R, Status, R with Tracing with Has[RequestContext]] {

      def effect[A](io: ZIO[R, Status, A]): ZIO[R with Tracing with Has[RequestContext], Status, A] =
        for {
          rc       <- ZIO.service[RequestContext]
          metadata <- rc.metadata.wrap(identity)
          result   <- withSemanticAttributes(io)                        .spanFrom(                          propagator,                          metadata,                          metadataGetter,                          rc.methodDescriptor.getFullMethodName,                          SpanKind.SERVER,                          { case _ => StatusCode.ERROR }                        )        } yield result

      def stream[A](io: ZStream[R, Status, A]): ZStream[R with Tracing with Has[RequestContext], Status, A] =
        ???
    }
}

Update Server Main

  • Add required layers for Tracing.
  • Transform the original ZGreeterImpl.
ZServer.scala
import zio.magic._

object ZServer extends ServerMain {
  private val requirements =
    ZLayer
      .wire[ZEnv with ZGreeterEnv](
        ZEnv.live,
        AppConfig.live.narrow(_.tracing),        ZTracer.live("hello-server"),        Tracing.live,        SttpTracing.live      )
      .orDie

  def services: ServiceList[Any] =
    ServiceList
      .add(ZGreeterImpl.transform[ZGreeterEnv, Has[RequestContext]](GrpcTracing.serverTracingTransform))      .provideLayer(requirements)
}

New client

Inject current context into gRPC Metadata for context propagation

GrpcTracing.scala
object GrpcTracing {

  ...

  private val metadataSetter: TextMapSetter[Metadata] = (carrier, key, value) =>    carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value)      val contextPropagationClientInterceptor: ZClientInterceptor[Tracing] = ZClientInterceptor.headersUpdater {    (_, _, metadata) =>      metadata.wrapM(Tracing.inject(propagator, _, metadataSetter))  }  
  ...
  
}
ZClient.scala
object ZClient extends zio.App {
  private val clientLayer = GreeterClient.live(
    ZManagedChannel(
      ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext(),
      Seq(GrpcTracing.contextPropagationClientInterceptor)    )
  )
  
  ...
}

Start a Span for each request

  • Use ZTransform to record the relevant gRPC attributes.
GrpcTracing.scala
object GrpcTracing {

  ...

  def clientTracingTransform[R]: ZTransform[R, Status, R with Tracing] =
    new ZTransform[R, Status, R with Tracing] {
      def effect[A](io: ZIO[R, Status, A]): ZIO[R with Tracing, Status, A] = withSemanticAttributes(io)
      def stream[A](io: ZStream[R, Status, A]): ZStream[R with Tracing, Status, A] = ???
    }
}
  • Unlike the server, we don't have access to a RequestContext object, so we have to set the method name manually.
  • We also start additional Spans.
ZClient.scala
object ZClient extends zio.App {
  
  ...
  
  
  private def errorToStatusCode[E]: PartialFunction[E, StatusCode] = { case _ => StatusCode.ERROR }

  private def sayHello(request: HelloRequest) =
    GreeterClient
      .sayHello(request)
      .span(        GreeterGrpc.METHOD_SAY_HELLO.getFullMethodName,        SpanKind.CLIENT,        errorToStatusCode      )      
  private val singleHello = sayHello(HelloRequest("World"))
    .span("singleHello", toErrorStatus = errorToStatusCode)
  private val multipleHellos = ZIO
    .collectAllParN(5)(
      List(
        sayHello(HelloRequest("1", Some(1))),
        sayHello(HelloRequest("2", Some(2))),
        sayHello(HelloRequest("3", Some(3))),
        sayHello(HelloRequest("4", Some(4))),
        sayHello(HelloRequest("5", Some(5)))
      )
    )
    .span("multipleHellos", toErrorStatus = errorToStatusCode)
  private val invalidHello = sayHello(HelloRequest("Invalid", Some(-1))).ignore
    .span("invalidHello", toErrorStatus = errorToStatusCode)}

Add required layers

ZClient.scala
object ZClient extends zio.App {

  ...
  
  private val requirements = ZLayer
    .wire[ZEnv with Tracing](
      ZEnv.live,
      AppConfig.live.narrow(_.tracing),      ZTracer.live("hello-client"),      Tracing.live    ) >+> clientLayer

  def run(args: List[String]): URIO[ZEnv, ExitCode] =
    myAppLogic.provideCustomLayer(requirements).exitCode}

Showtime

Run Jaeger through Docker

$ docker run --rm --name jaeger \
  -p 16686:16686 \
  -p 14250:14250 \
  jaegertracing/all-in-one:1.25

Start the server

$ TRACING_ENABLE=true sbt "zio/runMain com.github.tuleism.ZServer"

[info] running (fork) com.github.tuleism.ZServer
[info] Server is running. Press Ctrl-C to stop.

Start the client

$ TRACING_ENABLE=true sbt "zio/runMain com.github.tuleism.ZClient"

[info] running (fork) com.github.tuleism.ZClient
[info] Done
[success] Total time: 12 s

Distributed Tracing in action!

  • Now we can see the details for multipleHellos:

multipleHellos Span

  • And which guess is causing the longest delay.

bad guess

Integration with Logging

Add logging dependency

build.sbt
val izumiVersion         = "1.0.8"

val loggingDeps = Seq(
  "io.7mind.izumi" %% "logstage-core"          % izumiVersion,
  "io.7mind.izumi" %% "logstage-adapter-slf4j" % izumiVersion
)

Setup logging

  • Add trace_id, span_id to logging context if current trace context is valid.
Logging.scala
object Logging {
  private def baseLogger = IzLogger()

  val live: ZLayer[Has[Tracing.Service], Nothing, Has[LogZIO.Service]] =
    (
      for {
        tracing <- ZIO.service[Tracing.Service]
      } yield LogZIO.withDynamicContext(baseLogger)(
        Tracing.getCurrentSpanContext
          .map(spanContext =>
            if (spanContext.isValid)
              CustomContext(
                "trace_id"    -> spanContext.getTraceId,
                "span_id"     -> spanContext.getSpanId,
                "trace_flags" -> spanContext.getTraceFlags.asHex()
              )
            else
              CustomContext.empty
          )
          .provide(Has(tracing))
      )
    ).toLayer
}

Add a few log messages

  • E.g for singleHello.
ZClient.scala
object ZClient extends zio.App {

  ...

  private val singleHello = (
    for {
      _ <- log.info("singleHello")      _ <- sayHello(HelloRequest("World"))
    } yield ()
  ).span("singleHello", toErrorStatus = errorToStatusCode)
  
}

Sample Logs

[info] running (fork) com.github.tuleism.ZClient
[info] I 2021-11-01T22:59:10.881 (ZClient.scala:37)  …tuleism.ZClient.singleHello [24:zio-default-async-11] trace_id=9c8a7ebb87381293bc8937a5f7673cb9, span_id=cb7c9a440472e1be, trace_flags=01 singleHello
[info] I 2021-11-01T22:59:14.064 (ZClient.scala:44)  …eism.ZClient.multipleHellos [21:zio-default-async-8 ] trace_id=fe405246fbaa5f876c19f14fa649a99f, span_id=bef19494bef4106e, trace_flags=01 multipleHellos
[info] I 2021-11-01T22:59:18.171 (ZClient.scala:60)  …uleism.ZClient.invalidHello [26:zio-default-async-13] trace_id=be5ccd425e0cfb01fd97274abd0c4d72, span_id=ea6499fb9a7c8d28, trace_flags=01 invalidHello
[info] I 2021-11-01T22:59:18.272 (ZClient.scala:66)  ….tuleism.ZClient.myAppLogic [15:zio-default-async-2 ] Done
[success] Total time: 12 s

Extra notes

  • If we receive an HTTP 5xx response, we should set the Span status to error according to the semantic convention. However, it is currently not possible with zio-telemetry.
  • We need a better way to implement tracing for zio-grpc client.