Post

Distributed Context Propagation with otel4s

Distributed Context Propagation with otel4s

Distributed systems allow for building highly scalable and fault tolerant applications. Methodologies like event sourcing (ES) and command query responsibility segregation (CQRS) are continuing to grow in popularity as the defacto method for implementing distributed systems. However distributed systems come at the cost of being more complicated to implement, and harder to debug due to the fact that an error can span over multiple network boundaries.

Distributed tracing was introduced to help mitigate these debugging issues. OpenTelemetry is a set of open specifications and tools that allow for distributed tracing and metric collection to be accomplished in a standardized and pluggable way. According to the OpenTelemetry website it is defined as “High-quality, ubiquitous, and portable telemetry to enable effective observability”. Within the typelevel ecosystem the otel4s library was developed to “fully and faithfully implement the OpenTelemetry Specification atop Cats Effect.”

In this blog post we will create a mock distributed application and instrument it with distributed tracing using otel4s. This application will be made up of a http server, a gRPC server, the Kafka message broker and multiple data stores. By doing this we will show how we can trace one logical request across multiple network boundaries, and how this can be used for debugging and performance optimization.

The Github repo associated with this demo can be found here

System Design

Below shows the overall design of the system. system design

These services take a message as input, do some fake processing modeled by a call to IO.sleep, and then send the message downstream. The http server takes a single POST request located at url path api/v1/push_message which expects a single query param message. The gRPC server exposes a single rpc which takes a BrokerRequest and returns a BrokerResponse The shape of the the broker request and response can be seen below

1
2
3
4
5
6
7
message BrokerRequest {
    string message = 1;
}

message BrokerResponse {
    string message = 1;
}

The same message that is returned by the gRPC service is also pushed to a kafka topic called preprocessed_messages. The elastic, postgres and cassandra consumers all subscribe to the preprocessed_messages topic and pull the messages, do some processing of their own, and then persist it to their respective data store.

Important Concepts

Traces, Spans

Opentelemetry defines three types of signals that can be collected: traces, metrics and logs. In this demo we will only focus on traces and logs

Traces are made up of a tree of spans, which can be thought of as the period of time taken to perform a specific routine. Within each span there can be child spans, events and logs. A log is what you would expect, a text based side effect emitted by the program which is meant to be consumed by the application developer as opposed to the end user. An event is similar except that it has a timestamp that anchors it within a span.

The best way to understand tracing is through an example. Consider the pseudo-code function below. It is made up of four sub-routines. routine_a() takes somewhere between one and two seconds to complete. routine_b() takes exactly half a second to complete and always emits an event with a message attribute equal to “hello world!”. routine_c() takes between two and three seconds to complete and routine_d() takes between half a second and five seconds to complete. routines c and d are run in parallel.

1
2
3
4
def root_routine():
	routine_a()
	routine_b()
	run_parallel(routine_c(), routine_d())

The diagram below represents a visualization of an invocations of root_routine(). We can see spans representing the time taken by each routine as well as any events that occur during said routine. This allows us to clearly see where the performance bottlenecks are within our program.

tracing example

Context Propagation

In distributed systems traces need to persist across multiple network boundaries. Most transport protocols have some mechanism for storing key-value pairs that hold metadata specific to that request, that should be extracted and acted upon before the actual business logic is performed. HTTP has headers, kafka records have headers and gRPC has metadata. OpenTelemetry creates an abstraction over these key-value stores called a carrier and that is where important data necessary for context propagation is stored. The most important attribute to be stored in the carrier is the traceparent which is an attribute that has the id of the span which initiated the cross-boundary call. The system that receives this traceparent will be able to create a child span appropriately.

The use of traceparent within a carrier is a WC3 standard, and should be followed by all tracing solutions.

Simply put, if a client of some sort (HTTP, kafka producer, etc.) makes a request with the traceparent set within the carrier the application that consumes said request (HTTP, kafka consumer, etc) will be able to pick up the traceparent from that carrier and create a span with the appropriate parent. There exists tooling within the OpenTelemetry/java ecosystem which can automatically instrument applications with context propagation, however these tools are base on JVM threads. Cats-effect uses fibers as its concurrency primitive so these tools do not work. We must instead use tools that use otel4s or build our own.

Context Propagation in Action

Now that we know the basics of context propagation we can start working on the code. Our mock application has three types of network boundaries: HTTP, gRPC and kafka. HTTP context propagation cn be handled by the http4s-otel middleware. However at the time of writing their are no middleware solutions for gRPC and Kafka (that I could find, if I missed something please let me know!). We will write a generic utility for propagating context, centered around the existence of a carrier.

For the sake of brevity we will not go though the build.sbt file, but instead only mention the modules and their purpose. First is the core module. The core module holds the following methods:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
object Core:

  def randomSleep[F[_]: Async](min: Int, max: Int): F[String] =
    for
      rand <- Random.scalaUtilRandom[F]
      millisecondsToWait <- rand
        .betweenInt(min, max)
        .map(_.milliseconds)
      _ <- Async[F].sleep(millisecondsToWait)
      message = s"waited for ${millisecondsToWait}"
    yield message

  def otelResource[F[_]: Async: LiftIO]: Resource[F, Otel4s[F]] =
    Resource
      .eval(Sync[F].delay(GlobalOpenTelemetry.get))
      .evalMap(OtelJava.forAsync[F])

  def fromTracingCarrier[
      F[_]: Tracer: Concurrent,
      C: Monoid: TextMapGetter,
      O
  ](carrier: C, spanName: String)(body: Span[F] => F[O]): F[O] =
    MonadCancelThrow[F].uncancelable: poll =>
      Tracer[F].joinOrRoot(carrier):
        Tracer[F]
          .spanBuilder(spanName)
          .withSpanKind(SpanKind.Server)
          .build
          .use: span =>
            poll(body(span)).guaranteeCase:
              case Outcome.Succeeded(fa) =>
                span.addAttribute(Attribute("exit.case", "succeeded"))
              case Outcome.Errored(e) =>
                span.recordException(e) >>
                  span.addAttribute(Attribute("exit.case", "errored"))
              case Outcome.Canceled() =>
                span.addAttributes(
                  Attribute("exit.case", "canceled"),
                  Attribute("canceled", true),
                  Attribute("error", true)
                )

  def withTracingCarrier[
      F[_]: Tracer: Concurrent,
      C: TextMapUpdater: Monoid,
      O
  ](spanName: String)(
      body: C => F[O]
  ): F[O] =
    MonadCancelThrow[F].uncancelable: poll =>
      val carrier = Monoid[C].empty
      Tracer[F]
        .spanBuilder(spanName)
        .withSpanKind(SpanKind.Client)
        .build
        .use: span =>
          for
            traceHeaders <- Tracer[F].propagate(carrier)
            resp <- poll(body(traceHeaders)).guaranteeCase:
              case Outcome.Succeeded(fa) =>
                span.addAttribute(Attribute("exit.case", "succeeded"))
              case Outcome.Errored(e) =>
                span.recordException(e) >>
                  span.addAttribute(Attribute("exit.case", "errored"))
              case Outcome.Canceled() =>
                span.addAttributes(
                  Attribute("exit.case", "canceled"),
                  Attribute("canceled", true)
                )
          yield resp

randomSleep and otelResource are used for simulating work, and initializing otel4s respectively. The fromTracingCarrier and withTracingCarrier are used to pick up a traceparent id from a request and propagate a traceparent id respectively. These methods are adapted from the otel4s http-middleware but are modified to center around a carrier type C instead of http headers and Kleisli. The logic of each method is surrounded by an uncancelable block so that no matter what happens when either sending or handling a request, we will be able to set attributes for our span properly. All that our carrier type C needs is a Monoid typeclass so we can create empty carriers, and TextMapUpdater and TextMapGetter typeclasses. The latter two typeclasses essentially allow us to treat the carrier like it is a Map[String, String] so that we can set and get the traceparent value.

The grpc and kafka modules depend on the core module and implement the monoid, TextMapGetter and TextMapSetter typeclasses for gRPC meta and kafka header respectively. This means we now have all the functionality we need to perform context propagation across all our network boundaries.

We can now implement our various services. We will only show the routes class we use in our http service for brevity.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final class MyRoutes(
      brokerPreprocessor: BrokerPreprocessorFs2Grpc[IO, Metadata]
  )(using Tracer[IO])
      extends Http4sDsl[IO]:

    object Message extends QueryParamDecoderMatcher[String]("message")

    val routes = HttpRoutes.of[IO] {

      case POST -> Root / "api" / "v1" / "push_message" :? Message(message) =>
        withTracingCarrier[IO, Metadata, BrokerResponse]("grpc client") {
          metaCarrier =>
            brokerPreprocessor
              .processAndPushToBroker(
                BrokerRequest(message = message),
                ctx = metaCarrier
              )
        }.flatMap(br => Ok(br.message))
    }

It takes a gRPC client built for us by fs2-grpc, as an argument and defines a single POST request. Within that POST request it uses the withTracingCarrier method we defined in the core module to prepare a gRPC metadata object with a traceparent value that has been properly set, and then makes a request to the gRPC server using said carrier.

The following code is the implementation of the gRPC server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def brokerPreprocessor(
      producer: PartitionsFor[IO, String, String]
  )(using tracer: Tracer[IO]) =
    new BrokerPreprocessorFs2Grpc[IO, Metadata]:
      override def processAndPushToBroker(
          request: BrokerRequest,
          ctx: Metadata
      ): IO[BrokerResponse] =
        fromTracingCarrier(ctx, "grpc server"): s => 
          for
            message <- randomSleep[IO](500, 2500)
            _ <-
              withTracingCarrier[IO, Headers, ProducerResult[String, String]](
                "push to broker"
              ): carrier =>
                producer
                  .produceOne(
                    ProducerRecord(
                      topic_name,
                      "message",
                      s"gRPC preprocessor: $message\t"
                    )
                      .withHeaders(carrier)
                  )
                  .flatten
          yield (BrokerResponse(message = message))

It takes a Kafka producer as and argument and implements an anonymous class of BrokerPreprocessorFs2Grpc to handle gRPC requests. Again take notice of the fromTracingCarrier and withTracingCarrier methods. The former takes in the gRPC metadata class provided by the request handler, and gives us a span that will be a child of the span referenced by the traceparent. After calling randomSleep to simulate some “work” we then call withTracingCarrier to get a kafka headers class that has automatically had context propagation performed on it. We can then use this headers object to push our message to Kafka.

Next we will examine the snippet of code from our elastic consumer that pulls messages from kafka, simulates more “work”, and pushes to them to Elasticsearch. We will not show the code for the postgres and cassandra consumers as they are fairly similar to the elastic consumer.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
KafkaConsumer
  .stream(consumerSettings)
  .subscribeTo("preprocessed_messages")
  .records
  .mapAsync(25) { committable =>
    fromTracingCarrier(
      committable.record.headers,
      "elastic consumer"
    ): s =>
      val (key, value) = processRecord(committable.record)
      randomSleep[IO](500, 2500).flatMap: message =>
        trace
          .spanBuilder("persist to elastic search")
          .withParent(s.context)
          .build
          .surround(
            client.execute:
              indexInto("elastic_messages")
                .fields(
                  key -> s"$value elastic consumer: $message"
                )
                .refresh(RefreshPolicy.Immediate)
          )
  }
  .compile
  .drain

We pull records from our topic and for each record we pass the header to fromTracingCarrier so that way we continue with our trace. We again simulate some work by sleeping for a random amount of time, and then we use the span provided to us by fromTracingCarrier to surround our call to Elasticsearch so we can record exactly how long it took.

Running, Viewing Data and Viewing Traces

We containerize each of our microservices with docker so all of our services can be started using a simple call to docker compose up -d. Give all the services a few seconds to start up and then make a request to the system using the following command:

1
curl -X POST localhost:8080/api/v1/push_message?message=hello!

you should get a response like:

1
waited for 704 milliseconds

Querying our data stores

Next we will query all of our data stores to make sure our message persisted. We will start with Elasticsearch. Navigate to the elasticsearch dev tools console by entering http://localhost:5601/app/dev_tools#/console into a browser. In the console run the following query:

1
2
3
4
5
6
GET elastic_messages/_search
{
  "query": {
    "match_all": {}
  }
}

You should see a entries with a string value containing the original message sent in the post request, the time spent sleeping in the gRPC server and the time spent sleeping in the elastic consumer.

You can do the same by logging into the postgres server using any database tool (I use dbeaver) and running the following query:

1
select  * from postgres_messages;

Again you should see the message followed by the two sleep times.

Finally to log into cassandra and view the contents of the table. Start a CQL shell using docker

1
docker run --rm -it --network oteldemo-network nuvo/docker-cqlsh cqlsh cassandra 9042 --cqlversion='3.4.6'

and run the following query

1
select * from store.cassandra_messages;

Visualizing the Request

To access the jaeger dashboard navigate to http://localhost:16686/search. There should be a service called otel-demo, select it and click “find traces”. Assuming there is only one trace select it (There will be more if you made more requests to the http server). It should look similar to what is below. traces

If you have never seen a trace before it may seem like a lot to take in. A good place to start it the Services & Operation tab on the left side. It starts with a root span at the http request. A child span is created, by the withTracingCarrier function we created, when the gRPC client sends a request. Similarly a child span is created when the gRPC server receives the request. Again a new span is created when the gRPC server pushes to Kafka. Until now each span had only one child. Note that when each consumer pulls from kafka they are creating child spans from the same parent span. You can prove this by collapsing spans and seeing which child spans close with them.

By looking at the spans associated with each data store consumer we can see a visual representation of the actual time to persist to each of our datastores. Simply through visual inspection we can see that cassandra was the fastest to persist by a noticeable margin.

Drawbacks to this Approach

Up to this point we have shown how you can use otel4s to instrument services written using the Typelevel stack. Overall it has been a pleasant experience but there is an issue with our approach: withTracingCarrier and fromTracingCarrier pollute our code. It may not seem to matter that much when our applications only have one endpoint, but as our application grows we will have to constantly make calls to these methods. This is repeated code that obfuscates our business logic. A better pattern is the middleware pattern. http4s supports this pattern and that is why we did not have to write any code specific to it, just wrap any servers or clients in the middleware. gRPC and Kafka both have a concept of “interceptors” which serve more or less the same purpose as a middleware does to a web server. Unfortunately, at the time of writing interceptor functionality is not implemented for either fs2-grpc or fs2-kafka.

This post is licensed under CC BY 4.0 by the author.