Fixesspring-cloud/spring-cloud-stream-binder-kafka#318
* Use `ToDoubleFunction`-based `MeterRegistry.gauge()` variant to really
calculate a `lag` at runtime instead of statically defined before
* Add `KafkaBinderActuatorTests` integration test to demonstrate how
`Binder` is declared in the separate child context and how
`KafkaBinderMetrics` is not visible from the parent context.
This test also verify the real `gauge` value for the `consumer lag`
and should be used in the future to verify the `KafkaBinderMetrics`
exposure removing the code after TODO in the `KafkaMetricsTestConfig`
== Kafka Streams Binding Capabilities of Spring Cloud Stream
== Usage
Spring Cloud Stream Kafka support also includes a binder specifically designed for Apache Kafka Streams binding.
Using this binder, applications can be written that leverage the Apache Kafka Streams API.
For more information on Kafka Streams, see https://kafka.apache.org/documentation/streams/developer-guide[Kafka Streams API Developer Manual]
Kafka Streams support in Spring Cloud Stream is based on the foundations provided by the Spring Kafka project.
For details on that support, see http://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams[Kafaka Streams Support in Spring Kafka].
Here are the maven coordinates for the Spring Cloud Stream Kafka Streams binder artifact.
For using the Kafka Streams binder, you just need to add it to your Spring Cloud Stream application, using the following
Maven coordinates:
[source,xml]
----
@@ -17,13 +11,29 @@ Here are the maven coordinates for the Spring Cloud Stream Kafka Streams binder
</dependency>
----
High level streams DSL provided through the Kafka Streams API can be used through Spring Cloud Stream support.
Some minimal support for writing applications using the processor API is also available through the binder.
Kafka Streams applications using the Spring Cloud Stream support can be written using the processor model, i.e. messages read from an inbound topic and messages written to an outbound topic or using the sink style where it does not have an output binding.
== Kafka Streams Binder Overview
=== Usage example of high level streams DSL
Spring Cloud Stream's Apache Kafka support also includes a binder implementation designed explicitly for Apache Kafka
Streams binding. With this native integration, a Spring Cloud Stream "processor" application can directly use the
https://kafka.apache.org/documentation/streams/developer-guide[Apache Kafka Streams] APIs in the core business logic.
This application will listen from a Kafka topic and write the word count for each unique word that it sees in a 5 seconds time window.
Kafka Streams binder implementation builds on the foundation provided by the http://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams[Kafka Streams in Spring Kafka]
project.
As part of this native integration, the high-level https://docs.confluent.io/current/streams/developer-guide/dsl-api.html[Streams DSL]
provided by the Kafka Streams API is available for use in the business logic, too.
An early version of the https://docs.confluent.io/current/streams/developer-guide/processor-api.html[Processor API]
support is available as well.
As noted early-on, Kafka Streams support in Spring Cloud Stream strictly only available for use in the Processor model.
A model in which the messages read from an inbound topic, business processing can be applied, and the transformed messages
can be written to an outbound topic. It can also be used in Processor applications with a no-outbound destination.
=== Streams DSL
This application consumes data from a Kafka topic (e.g., `words`), computes word count for each unique word in a 5 seconds
time window, and the computed results are sent to a downstream topic (e.g., `counts`) for further processing.
[source]
----
@@ -48,25 +58,130 @@ public class WordCountProcessorApplication {
}
----
If you build it as a Spring Boot uber jar, you can run the above example in the following way:
Once built as a uber-jar (e.g., `wordcount-processor.jar`), you can run the above example like the following.
This means that the application will listen from the incoming Kafka topic `words` and write to the output topic `counts`.
This application will consume messages from the Kafka topic `words` and the computed results are published to an output
topic `counts`.
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are bound as KStream objects.
Applications can exclusively focus on the business aspects of the code, i.e. writing the logic required in the processor rather than setting up the streams specific configuration required by the Kafka Streams infrastructure.
All such infrastructure details are handled by the framework.
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as
KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic
required in the processor. Setting up the Streams DSL specific configuration required by the Kafka Streams infrastructure
is automatically handled by the framework.
=== Multiple Input bindings on the inbound
== Configuration Options
Spring Cloud Stream Kafka Streams binder allows the users to write applications with multiple bindings.
There are use cases in which you may want to have multiple incoming KStream objects or a combination of KStream and KTable objects.
Both of these flavors are supported.
Here are some examples.
This section contains the configuration options used by the Kafka Streams binder.
For common configuration options and properties pertaining to binder, refer to the <<binding-properties,core documentation>>.
=== Kafka Streams Properties
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.binder.`
literal.
configuration::
Map with a key/value pair containing properties pertaining to Apache Kafka Streams API.
This property must be prefixed with `spring.cloud.stream.kafka.streams.binder.`.
Following are some examples of using this property.
For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in
Apache Kafka Streams docs.
brokers::
Broker URL
+
Default: `localhost`
zkNodes::
Zookeeper URL
+
Default: `localhost`
serdeError::
Deserialization error handler type.
Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq`
+
Default: `logAndFail`
applicationId::
Application ID for all the stream configurations in the current application context.
You can override the application id for an individual `StreamListener` method using the `group` property on the binding.
You have to ensure that you are using the same group name for all input bindings in the case of multiple inputs on the same methods.
+
Default: `default`
The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`
literal.
keySerde::
key serde to use
+
Default: `none`.
valueSerde::
value serde to use
+
Default: `none`.
useNativeEncoding::
flag to enable native encoding
+
Default: `false`.
The following properties are _only_ available for Kafka Streams consumers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer.`
literal.
keySerde::
key serde to use
+
Default: `none`.
valueSerde::
value serde to use
+
Default: `none`.
materializedAs::
state store to materialize when using incoming KTable types
+
Default: `none`.
useNativeDecoding::
flag to enable native decoding
+
Default: `false`.
dlqName::
DLQ topic name.
+
Default: `none`.
=== TimeWindow properties:
Windowing is an important concept in stream processing applications. Following properties are available to configure
In the above example, the application is written in a sink style, i.e. there are no output bindings and the application has to make the decision to what needs to happen.
Most likely, when you write applications this way, you might want to send the information downstream or store them in a state store (See below for Queryable State Stores).
In the above example, the application is written as a sink, i.e. there are no output bindings and the application has to
decide concerning downstream processing. When you write applications in this style, you might want to send the information
downstream or store them in a state store (See below for Queryable State Stores).
In the case of incoming KTable, if you want to materialize it as a state store, you have to express that through the following property.
In the case of incoming KTable, if you want to materialize the computations to a state store, you have to express it
Kafka Streams allow outbound data to be split into multiple topics based on some predicates.
Spring Cloud Stream Kafka Streams binder provides support for this feature without losing the overall programming model exposed through `StreamListener` in the end user application.
You write the application in the usual way as demonstrated above in the word count example.
When using the branching feature, you are required to do a few things.
First, you need to make sure that your return type is `KStream[]` instead of a regular `KStream`.
Then you need to use the `SendTo` annotation containing the output bindings in the order (example below).
For each of these output bindings, you need to configure destination, content-type etc. as required by any other standard Spring Cloud Stream application
Kafka Streams allow outbound data to be split into multiple topics based on some predicates. The Kafka Streams binder provides
support for this feature without compromising the programming model exposed through `StreamListener` in the end user application.
You can write the application in the usual way as demonstrated above in the word count example. However, when using the
branching feature, you are required to do a few things. First, you need to make sure that your return type is `KStream[]`
instead of a regular `KStream`. Second, you need to use the `SendTo` annotation containing the output bindings in the order
(see example below). For each of these output bindings, you need to configure destination, content-type etc., complying with
the standard Spring Cloud Stream expectations.
Here is an example:
@@ -181,7 +299,7 @@ public static class WordCountProcessorApplication {
=== Message conversion in Spring Cloud Stream Kafka Streams applications
== Message Conversion
Spring Cloud Stream Kafka Streams binder allows the usage of usual patterns for contenttype conversions as in other message channel based binder applications.
Many Kafka Streams operations - that are part of the actual application and not at the inbound and outbound - need to know the type of SerDe’s used to correctly transform key and value data.
Therefore, it may be more natural to rely on the SerDe facilities provided by the Apache Kafka Streams library itself for inbound and outbound conversions rather than using the content type conversions offered by the framework.
On the other hand, you might be already familiar with the content type conversion patterns in spring cloud stream and want to keep using them for inbound and outbound conversions.
Both options are supported in the Spring Cloud Stream binder for Apache Kafka Streams.
Similar to message-channel based binder applications, the Kafka Streams binder adapts to the out-of-the-box content-type
conversions without any compromise.
It is typical for Kafka Streams operations to know the type of SerDe’s used to transform the key and value correctly.
Therefore, it may be more natural to rely on the SerDe facilities provided by the Apache Kafka Streams library itself at
the inbound and outbound conversions rather than using the content-type conversions offered by the framework.
On the other hand, you might be already familiar with the content-type conversion patterns provided by the framework, and
that, you'd like to continue using for inbound and outbound conversions.
Both the options are supported in the Kafka Streams binder implementation.
==== Outbound serialization
If native encoding is disabled (which is the default), then the framework will convert the message using the contentType set by the user (or the default content type of application/json).
It will ignore any Serde set on the outbound in this case for outbound serialization.
If native encoding is disabled (which is the default), then the framework will convert the message using the contentType
set by the user (otherwise, the default `application/json` will be applied). It will ignore any SerDe set on the outbound
in this case for outbound serialization.
Here is the property to set the contentType on the outbound.
@@ -237,17 +361,19 @@ Here is the property to enable native encoding.
If native encoding is enabled on the output binding (user has to explicitly enable it as above), then the framework will skip doing any message conversion on the outbound.
In that case, it will use the Serde set by the user.
First, it checks for the `valueSerde` property set on the actual output binding. Here is an example
If native encoding is enabled on the output binding (user has to enable it as above explicitly), then the framework will
skip any form of automatic message conversion on the outbound. In that case, it will switch to the Serde set by the user.
The `valueSerde` property set on the actual output binding will be used. Here is an example.
If this property is not set, then it will default to the common value Serde - `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
If this property is not set, then it will use the "default" SerDe: `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
It is worth to mention that Spring Cloud Stream Kafka Streams binder does not serialize the keys on outbound, rather it is always done by Kafka itself.
Therefore, you either have to specify the keySerde property on the binding or it will default to the applicationwide common keySerde set on the streams configuration.
It is worth to mention that Kafka Streams binder does not serialize the keys on outbound - it simply relies on Kafka itself.
Therefore, you either have to specify the `keySerde` property on the binding or it will default to the application-wide common
Then if you have `SendTo` like this, @SendTo({"output1", "output2", "output3"}), the `KStream[]` from the branches are applied with proper Serde objects as defined above.
If you are not enabling nativeEncoding, you can then set different contentType values on the output bindings as below.
In that case, the framework will use the appropriate message converter to convert the messages before sending to Kafka.
Then if you have `SendTo` like this, @SendTo({"output1", "output2", "output3"}), the `KStream[]` from the branches are
applied with proper SerDe objects as defined above. If you are not enabling `nativeEncoding`, you can then set different
contentType values on the output bindings as below. In that case, the framework will use the appropriate message converter
Similar rules apply to data deserialization on the inbound as in the case of outbound serialization.
Similar rules apply to data deserialization on the inbound.
If native decoding is disabled (which is the default), then the framework will convert the message using the contentType set by the user (or the default content type of application/json).
It will ignore any Serde set on the inbound in this case for inbound dserialization.
If native decoding is disabled (which is the default), then the framework will convert the message using the contentType
set by the user (otherwise, the default `application/json` will be applied). It will ignore any SerDe set on the inbound
in this case for inbound deserialization.
Here is the property to set the contentType on the inbound.
@@ -322,18 +452,20 @@ Here is the property to enable native decoding.
If native decoding is enabled on the input binding (user has to explicitly enable it as above), then the framework will skip doing any message conversion on the inbound.
In that case, it will use the Serde set by the user.
First, it checks for the `valueSerde` property set on the actual input binding. Here is an example
If native decoding is enabled on the input binding (user has to enable it as above explicitly), then the framework will
skip doing any message conversion on the inbound. In that case, it will switch to the SerDe set by the user. The `valueSerde`
property set on the actual output binding will be used. Here is an example.
If this property is not set, then it will default to the common value Serde - `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
It is worth to mention that Spring Cloud Stream Kafka Streamsbinder does not deserialize the keys on inbound, rather it is always done by Kafka itself.
Therefore, you either have to specify the keySerde property on the binding or it will default to the application wide common keySerde set on the streams configuration.
If this property is not set, it will use the default SerDe: `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
It is worth to mention that Kafka Streams binder does not deserialize the keys on inbound - it simply relies on Kafka itself.
Therefore, you either have to specify the `keySerde` property on the binding or it will default to the application-wide common
As in the case of KStream branching on the outbound, the benefit of setting value Serde per binding is that if you have multiple input bindings (multiple KStreams) and they all require separate value Serdes, then you can configure them individually.
If you use the common configuration approach, then that is not possible.
As in the case of KStream branching on the outbound, the benefit of setting value SerDe per binding is that if you have
multiple input bindings (multiple KStreams object) and they all require separate value SerDe's, then you can configure
them individually. If you use the common configuration approach, then this feature won't be applicable.
==== Error handling on Deserialization exceptions
== Error Handling
Apache Kafka Streams now provide the capability for natively handling exceptions from deserialization errors.
Apache Kafka Streams provide the capability for natively handling exceptions from deserialization errors.
For details on this support, please see https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers[this]
Out of the box, Apache Kafka Streams provide two kinds of deserialization exception handlers - logAndContinue and logAndFail.
As the name indicates, the former will log the error and continue processing next records and the latter will log the error and fai..
LogAndFail is the default deserialization exception handler.
Out of the box, Apache Kafka Streams provide two kinds of deserialization exception handlers - `logAndContinue` and `logAndFail`.
As the name indicates, the former will log the error and continue processing the next records and the latter will log the
error and fail. `LogAndFail` is the default deserialization exception handler.
=== Handling Deserialization Exceptions
Kafka Streams binder supports a selection of exception handlers through the following properties.
Spring Cloud Stream binder for Apache Kafka Streams allows to specify these exception handlers through the following properties.
In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the bad records (poison pills) to a DLQ topic.
Here is how you enable this DLQ exception handler.
In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous
records (poison pills) to a DLQ topic. Here is how you enable this DLQ exception handler.
If this is set, then the records in error are sent to the topic `foo-dlq`.
If this is not set, then it will create a DLQ topic called `error.<input-topic-name>.<group-name>`.
A couple of things to keep in mind when using the exception handling feature through Spring Cloud Stream binder for Apache Kafka Streams.
If this is set, then the error records are sent to the topic `foo-dlq`. If this is not set, then it will create a DLQ
topic with the name `error.<input-topic-name>.<group-name>`.
* The property `spring.cloud.stream.kafka.streams.binder.serdeError` is applicable for the entire application.
This implies that if there are multiple `StreamListener` methods in the same application, this property is applied to all of them.
* The exception handling for deserialization works consistently with native deserialization and framework provided message conversion.
A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder.
==== Handling Non-Deserialization exceptions
* The property `spring.cloud.stream.kafka.streams.binder.serdeError` is applicable for the entire application. This implies
that if there are multiple `StreamListener` methods in the same application, this property is applied to all of them.
* The exception handling for deserialization works consistently with native deserialization and framework provided message
conversion.
Other kinds of error handling is limited in Apache Kafka Streams currently and it is up to the end user applications to handle any such application level errors.
One side effect of providing a DLQ for deserialization exception handlers as above is that, it provides a way to get access to the DLQ sending bean directly from your application.
=== Handling Non-Deserialization Exceptions
For general error handling in Kafka Streams binder, it is up to the end user applications to handle application level errors.
As a side effect of providing a DLQ for deserialization exception handlers, Kafka Streams binder provides a way to get
access to the DLQ sending bean directly from your application.
Once you get access to that bean, you can programmatically send any exception records from your application to the DLQ.
Here is an example for how you may do that.
Keep in mind that, this approach only works out of the box when you use the low level processor API in your application as below.
It still remains hard to achieve the same using the high level DSL without the library natively providing error handling support, but this example provides some hints to work around.
It continues to remain hard to robust error handling using the high-level DSL; Kafka Streams doesn't natively support error
handling yet.
However, when you use the low-level Processor API in your application, there are options to control this behavior. See
Then you can retrieve the data that you stored in this store during the execution of your application.
=== Kafka Streams properties
We covered all the relevant properties that you need when writing Kafka Streams applications using Spring Cloud Stream, scattered in the above sections, but here they are again.
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.binder.`.
configuration::
Map with a key/value pair containing properties pertaining to Apache Kafka Streams API.
This property must be prefixed with `spring.cloud.stream.kafka.streams.binder.`.
Following are some examples of using this property.
For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in Apache Kafka Streams docs.
brokers::
Broker URL
+
Default: `localhost`
zkNodes::
Zookeeper URL
+
Default: `localhost`
serdeError::
Deserialization error handler type.
Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq`
+
Default: `logAndFail`
applicationId::
Application ID for all the stream configurations in the current application context.
You can override the application id for an individual `StreamListener` method using the `group` property on the binding.
You have to ensure that you are using the same group name for all input bindings in the case of multiple inputs on the same methods.
+
Default: `default`
The following properties are available for Kafka Streams producers only and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`.
keySerde::
key serde to use
+
Default: `none`.
valueSerde::
value serde to use
+
Default: `none`.
useNativeEncoding::
flag to enable native encoding
+
Default: `false`.
The following properties are available for Kafka Streams consumers only and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer.`.
keySerde::
key serde to use
+
Default: `none`.
valueSerde::
value serde to use
+
Default: `none`.
materializedAs::
state store to materialize when using incoming KTable types
+
Default: `none`.
useNativeDecoding::
flag to enable native decoding
+
Default: `false`.
dlqName::
DLQ topic name.
+
Default: `none`.
Other common properties used from core Spring Cloud Stream.
// Artificial slow listener to emulate consumer lag
Thread.sleep(1000);
}
}
}
Reference in New Issue
Block a user
Blocking a user prevents them from interacting with repositories, such as opening or commenting on pull requests or issues. Learn more about blocking a user.