Document Kafka Streams and Sleuth integration
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1102
This commit is contained in:
@@ -1870,6 +1870,69 @@ When there are multiple bindings present on a single function, invoking these op
|
||||
This is because all the bindings on a single function are backed by the same `StreamsBuilderFactoryBean`.
|
||||
Therefore, for the function above, either `function-in-0` or `function-out-0` will work.
|
||||
|
||||
=== Tracing using Spring Cloud Sleuth
|
||||
|
||||
When Spring Cloud Sleuth is on the classpath of a Spring Cloud Stream Kafka Streams binder based application, both its consumer and producer are automatically instrumented with tracing information.
|
||||
However, in order to trace any application specific operations, those need to be explicitly instrumented by the user code.
|
||||
This can be done by injecting the `KafkaStreamsTracing` bean from Spring Cloud Sleuth in the application and then invoke various Kafka Streams operations through this injected bean.
|
||||
Here are some examples of using it.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
|
||||
return (userClicksStream, userRegionsTable) -> (userClicksStream
|
||||
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
|
||||
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
|
||||
"UNKNOWN" : region, clicks),
|
||||
Joined.with(Serdes.String(), Serdes.Long(), null))
|
||||
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
|
||||
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
|
||||
return new KeyValue<>(value.getRegion(),
|
||||
value.getClicks());
|
||||
}))
|
||||
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
|
||||
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
|
||||
.toStream());
|
||||
}
|
||||
```
|
||||
|
||||
In the example above, there are two places where it adds explicit tracing instrumentation.
|
||||
First, we are logging the key/value information from the incoming `KStream`.
|
||||
When this information is logged, the associated span and trace IDs get logged as well so that a monitoring system can track them and correlate with the same span id.
|
||||
Second, when we call a `map` operation, instead of calling it directly on the `KStream` class, we wrap it inside a `transform` operation and then call `map` from `KafkaStreamsTracing`.
|
||||
In this case also, the logged message will contain the span ID and trace ID.
|
||||
|
||||
Here is another example, where we use the low-level transformer API for accessing the various Kafka Streams headers.
|
||||
When spring-cloud-sleuth is on the classpath, all the tracing headers can also be accessed like this.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
|
||||
return input -> input.transform(kafkaStreamsTracing.transformer(
|
||||
"transformer-1",
|
||||
() -> new Transformer<String, String, KeyValue<String, String>>() {
|
||||
ProcessorContext context;
|
||||
|
||||
@Override
|
||||
public void init(ProcessorContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyValue<String, String> transform(String key, String value) {
|
||||
LOG.info("Headers: " + this.context.headers());
|
||||
LOG.info("K/V:" + key + "/" + value);
|
||||
// More transformations, business logic execution, etc. go here.
|
||||
return KeyValue.pair(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}));
|
||||
}
|
||||
```
|
||||
|
||||
=== Configuration Options
|
||||
|
||||
This section contains the configuration options used by the Kafka Streams binder.
|
||||
|
||||
Reference in New Issue
Block a user