Cleanup in Kafka Streams docs

This commit is contained in:
Soby Chacko
2019-11-07 18:07:27 -05:00
parent 8e26d5e170
commit fefd9a3bd6

View File

@@ -111,7 +111,7 @@ public class WordCountProcessorApplication {
}
----
Here again, this is a complete Spring Boot application. The difference here from the first application, though, the bean method is of type `java.util.function.Function`.
Here again, this is a complete Spring Boot application. The difference here from the first application is that the bean method is of type `java.util.function.Function`.
The first parameterized type for the `Function` is for the input `KStream` and the second one is for the output.
In the method body, a lambda expression is provided that is of type `Function` and as implementation, the actual business logic is given.
Similar to the previously discussed Consumer based application, the input binding here is named as `process-in-0` by default. For the output, the binding name is automatically also set to `process-out-0`.
@@ -120,7 +120,7 @@ Once built as a uber-jar (e.g., `wordcount-processor.jar`), you can run the abov
[source]
----
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
----
This application will consume messages from the Kafka topic `words` and the computed results are published to an output
@@ -128,7 +128,7 @@ topic `counts`.
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as
KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic
required in the processor. Setting up the Streams DSL specific configuration required by the Kafka Streams infrastructure
required in the processor. Setting up Kafka Streams specific configuration required by the Kafka Streams infrastructure
is automatically handled by the framework.
The two examples we saw above have a single `KStream` input binding. In both cases, the bindings received the records from a single topic.
@@ -165,15 +165,14 @@ public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String,
}
----
Here again, the basic theme is the same as previous examples, the difference, though, you have two inputs.
Here again, the basic theme is the same as in the previous examples, but here we have two inputs.
Java's `BiFunction` support is used to bind the inputs to the desired destinations.
The default binding names generated by the binder for the inputs are `process-in-0` and `process-in-1` respectively. The default output binding is `process-out-0`.
In this example, the first parameter of `BiFunction` is bound as a `KStream` for the first input and the second parameter is bound as a `KTable`.
In this example, the first parameter of `BiFunction` is bound as a `KStream` for the first input and the second parameter is bound as a `KTable` for the second input.
====== BiConsumer in Kafka Streams Binder
If there are two inputs, but no outputs, in that case we can use `java.util.funcion.BiConsumer`.
Here is a blueprint for such an application.
If there are two inputs, but no outputs, in that case we can use `java.util.funcion.BiConsumer` as shown below.
[source]
----
@@ -183,6 +182,8 @@ public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
}
----
====== Beyond two inputs
What if you have more than two inputs?
There are situations in which you need more than two inputs. In that case, the binder allows you to chain partial functions.
In functional programming jargon, this technique is generally known as currying.
@@ -264,8 +265,9 @@ public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
The programming model remains the same, however the outbound parameterized type is `KStream[]`.
The default output binding names are `process-out-0`, `process-out-1`, `process-out-2` respectively.
The reason why the binder generates three output bindings is because it detects the length of the returned `KStream` array.
===== Function based Programming Styles for Kafka Streams
===== Summary of Function based Programming Styles for Kafka Streams
In summary, the following table shows the various options that can be used in the functional paradigm.
@@ -395,26 +397,26 @@ Finally, here is the `StreamListener` equivalent of the application with three i
...
...
@StreamListener
@SendTo("output")
public KStream<Long, EnrichedOrder> process(
@Input("input-1") KStream<Long, Order> ordersStream,
@Input("input-"2) GlobalKTable<Long, Customer> customers,
@Input("input-3") GlobalKTable<Long, Product> products) {
@SendTo("output")
public KStream<Long, EnrichedOrder> process(
@Input("input-1") KStream<Long, Order> ordersStream,
@Input("input-"2) GlobalKTable<Long, Customer> customers,
@Input("input-3") GlobalKTable<Long, Product> products) {
KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(
customers, (orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order));
KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(
customers, (orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order));
return customerOrdersStream.join(products,
(orderId, customerOrder) -> customerOrder.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
});
}
return customerOrdersStream.join(products,
(orderId, customerOrder) -> customerOrder.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
});
}
interface CustomGlobalKTableProcessor {
@@ -436,7 +438,7 @@ Finally, here is the `StreamListener` equivalent of the application with three i
You might notice that the above two examples are even more verbose since in addition to provide `EnableBinding`, you also need to write your own custom binding interface as well.
Using the functional model, you can avoid all those ceremonial details.
Before we move on looking at the general programming model offered by Kafka Streams binder, here is the `StreamListener` version of multiple output bindings.
Before we move on from looking at the general programming model offered by Kafka Streams binder, here is the `StreamListener` version of multiple output bindings.
[source]
----
@@ -515,12 +517,12 @@ public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, St
```
In this case, the binder will create 3 separate Kafka Streams objects with different application ID's (more on this below).
However, if you have more than one processor in the application, you have to tell Spring Cloud Stream, which functions need to be active.
However, if you have more than one processor in the application, you have to tell Spring Cloud Stream, which functions need to be activated.
Here is how you activate the functions.
`spring.cloud.stream.function.definition: process;anotherProcess;yetAnotherProcess`
You can remove the processor names from this property that you don't want to be activated right away.
If you want certain functions to be not activated right away, you can remove that from this list.
This is also true when you have a single Kafka Streams processor and other types of `Function` beans in the same application that is handled through a different binder (for e.g., a function bean that is based on the regular Kafka Message Channel binder)
@@ -591,16 +593,16 @@ and
`spring.cloud.stream.kafka.streams.bindings.anotherInput.applicationId`
Fof function based model also, this approach of setting application id at the binding level will work.
For function based model also, this approach of setting application id at the binding level will work.
However, setting per function at the binder level as we have seen above is much easier if you are using the functional model.
For production deployments, it is highly recommended to explicitly specify the application ID through configuration.
This is especially going to be very critical if you are auto scaling your application in which case you need to make sure that you are deploying each instance with the same application ID.
If the application does not provide an application ID, then in that case the binder will auto generate a random application ID for you.
If the application does not provide an application ID, then in that case the binder will auto generate a static application ID for you.
This is convenient in development scenarios as it avoids the need for explicitly providing the application ID.
The generated application ID in this manner will be static over application restarts.
In the case of functional model, the generated application ID will be the function bean name followed by the literal `applicationID`.
In the case of functional model, the generated application ID will be the function bean name followed by the literal `applicationID`, for e.g `process-applicationID` if `process` if the function bean name.
In the case of `StreamListener`, instead of using the function bean name, the generated application ID will be use the containing class name followed by the method name followed by the literal `applicationId`.
====== Summary of setting Application ID
@@ -628,7 +630,7 @@ public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String,
----
Binder will generate bindings with names, `process-in-0`, `process-in-1` and `process-out-0`.
Now, if you want to change them to something else completely, maybe more domain specific binding names. You can do so, as below.
Now, if you want to change them to something else completely, maybe more domain specific binding names, then you can do so as below.
`springc.cloud.stream.function.bindings.process-in-0=users`
@@ -665,8 +667,8 @@ Lets look at some details.
Keys are always deserialized by native Serdes.
By default, for values, deserialization on the inbound is natively performed by Kafka.
Please note that this is a major change on default behavior from previous versions of Kafka Streams binder in which case the deserialization was done by the framework.
For values, by default, deserialization on the inbound is natively performed by Kafka.
Please note that this is a major change on default behavior from previous versions of Kafka Streams binder where the deserialization was done by the framework.
Kafka Streams binder will try to infer matching `Serde` types by looking at the type signature of `java.util.function.Function|Consumer` or `StreamListener`.
Here is the order that it matches Serdes.
@@ -694,7 +696,7 @@ Here are the Serde types that the binder will try to match from Kafka Streams.
* If none of the Serdes provided by Kafka Streams don't match the types, then it will use JsonSerde provided by Spring Kafka. In this case, the binder assumes that the types are JSON friendly.
This is useful if you have multiple value objects as inputs since the binder will internally infer them to correct Java types.
Before falling back to the `JsonSerde` though, the binder checks at the default Serdes's set at the Kafka Streams level to see if it is a Serde that it can match with the incoming KStream's types.
Before falling back to the `JsonSerde` though, the binder checks at the default Serdes's set in the Kafka Streams configuration to see if it is a `Serde` that it can match with the incoming KStream's types.
If none of the above strategies worked, then the applications must provide the Serdes through configuration.
This can be configured in two ways - binding or default.
@@ -717,6 +719,8 @@ spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=Custom
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
```
NOTE: If you provide `Serde` as abover per input binding, then that will takes higher precedence and the binder will stay away from any `Serde` inference.
If you want the default key/value Serdes to be used for inbound deserialization, you can do so at the binder level.
```
@@ -731,7 +735,7 @@ For e.g. if you have the same BiFunction processor as above, then `spring.cloud.
You need to disable native decoding for all the inputs individually. Otherwise, native decoding will still be applied for those you don't disable.
By default, Spring Cloud Stream will use `application/json` as the content type and use an appropriate json message converter.
You can use custom message converters by using the following property.
You can use custom message converters by using the following property and an appropriate `MessageConverter` bean.
```
spring.cloud.stream.bindings.process-in-0.contentType
```
@@ -747,8 +751,8 @@ If it can't infer the type of the key, then that needs to be specified using con
Value serdes are inferred using the same rules used for inbound deserialization.
First it matches to see if the outbound type is from a provided bean in the application.
If not, it checks to see if it matches with a `Serde` exposed by Kafka such as - Integer, Long, Short, Double, Float, byte[], UUID and String.
If that doesnt't work, then fall back to JsonSerde provided by the Spring Kafka project, but first look at the default `Serde` configuration to see if there is a match.
If not, it checks to see if it matches with a `Serde` exposed by Kafka such as - `Integer`, `Long`, `Short`, `Double`, `Float`, `byte[]`, `UUID` and `String`.
If that doesnt't work, then it falls back to `JsonSerde` provided by the Spring Kafka project, but first look at the default `Serde` configuration to see if there is a match.
Keep in mind that all these happen transparently to the application.
If none of these work, then the user has to provide the `Serde` to use by configuration.
@@ -774,7 +778,7 @@ For e.g. if you have the same BiFunction processor as above, then `spring.cloud.
You need to disable native encoding for all the output individually in the case of branching. Otherwise, native encoding will still be applied for those you don't disable.
When conversion is done by Spring Cloud Stream, by default, it will use `application/json` as the content type and use an appropriate json message converter.
You can use custom message converters by using the following property.
You can use custom message converters by using the following property and a corresponding `MessageConverter` bean.
```
spring.cloud.stream.bindings.process-out-0.contentType
```
@@ -864,7 +868,7 @@ conversion.
=== State Store
State store is created automatically by Kafka Streams when the high level DSL is used.
State stores are created automatically by Kafka Streams when the high level DSL is used and appropriate calls are made those trigger a state store.
If you want to materialize an incoming `KTable` binding as a named state store, then you can do so by using the following strategy.
@@ -888,29 +892,29 @@ spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs:
You can define custom state stores as beans in your application and those will be detected and added to the Kafka Streams builder by the binder.
Especially when the processor API is used, you need to register a state store manually.
In order to do so, you can create the StateStore as a bean in the application.
Here is an example of defining such a bean.
Here are examples of defining such beans.
[source]
----
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
----
These state stores can be then accessed by the applications directly.
During the bootstrap, the above bean will be processed by the binder and passed on to the Streams builder object.
During the bootstrap, the above beans will be processed by the binder and passed on to the Streams builder object.
Accessing the state store:
[source]
@@ -993,11 +997,11 @@ The health indicator requires the dependency `spring-boot-starter-actuator`. For
</dependency>
----
Spring Cloud Stream Binder Kafka Streams provides a health indicator to check the state of the underlying Kafka threads.
Spring Cloud Stream Kafka Streams Binder provides a health indicator to check the state of the underlying streams threads.
Spring Cloud Stream defines a property `management.health.binders.enabled` to enable the health indicator. See the
https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_health_indicator[Spring Cloud Stream documentation].
The health indicator provides the following details for each Kafka threads:
The health indicator provides the following details for each stream thread's metadata:
* Thread name
* Thread state: `CREATED`, `RUNNING`, `PARTITIONS_REVOKED`, `PARTITIONS_ASSIGNED`, `PENDING_SHUTDOWN` or `DEAD`
@@ -1033,7 +1037,7 @@ For example, if the application ID of the first processor is `processor-1`, then
You can either programmatically access the Micrometer `MeterRegistry` in the application and then iterate through the available gauges or use Spring Boot actuator to access the metrics through a REST endpoint.
When accessing through the Boot actuator endpoint, make sure to add `metrics` to the property `management.endpoints.web.exposure.include`.
Then you can access `/acutator/metrics` to get a list of all the available metrics which then can be individually accessed through the same URL (`/actuator/metrics/<metric-name>`).
Then you can access `/acutator/metrics` to get a list of all the available metrics which then can be individually accessed through the same URI (`/actuator/metrics/<metric-name>`).
Anything beyond the info level metrics available through `KafkaStreams#metrics()`, (for e.g. the debugging level metrics) are still only available through JMX after you set the `metrics.recording.level` to `DEBUG`.
Kafka Streams, by default, set this level to `INFO`.
@@ -1125,21 +1129,21 @@ Here is the output binding destination:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
```
If the topic `outputTopic` has 4 partitions, if you don't provide a partitioning strategy, Kafka Streams will use default partitioning strategy which may or may not work depending on the particular use case.
Let's say, you want to send any key that matches to `foo` to partition 0, `bar` to partion 1, `baz` to partition 2, and everything else to partition 3.
If the topic `outputTopic` has 4 partitions, if you don't provide a partitioning strategy, Kafka Streams will use default partitioning strategy which may not be the outcome you want depending on the particular use case.
Let's say, you want to send any key that matches to `spring` to partition 0, `cloud` to partion 1, `stream` to partition 2, and everything else to partition 3.
This is what you need to do in the application.
```
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("foo")) {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("bar")) {
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("baz")) {
else if (k.equals("stream")) {
return 2;
}
else {
@@ -1149,7 +1153,7 @@ public StreamPartitioner<String, WordCount> streamPartitioner() {
}
```
This is a rudimentary implementation, however, you have access to the key and value of the record, the topic name and the total number of partitions.
This is a rudimentary implementation, however, you have access to the key/value of the record, the topic name and the total number of partitions.
Therefore, you can implement complex partiioning strategies if need be.
You also need to provide this bean name along with the application configuration.
@@ -1163,8 +1167,8 @@ Each output topic in the application needs to be configured separately like this
=== StreamsBuilderFactoryBean customizer
It is often required to customize the `StreamsBuilderFactoryBean` that creates the `KafkaStreams` objects.
Based on the underlying support provided by Spring Kafka, the binder allows you to customize the `StreamsBuilderFactoryBean` in two ways.
One, you can use the `StreamsBuilderFactoryBeanCustomizer` to customize the `StreamsBuilderFactoryBean` itself.
Based on the underlying support provided by Spring Kafka, the binder allows you to customize the `StreamsBuilderFactoryBean`.
You can use the `StreamsBuilderFactoryBeanCustomizer` to customize the `StreamsBuilderFactoryBean` itself.
Then, once you get access to the `StreamsBuilderFactoryBean` through this customizer, you can customize the corresponding `KafkaStreams` using `KafkaStreamsCustomzier`.
Both of these customizers are part of the Spring for Apache Kafka project.
@@ -1205,7 +1209,7 @@ public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer()
`KafkaStreamsCustomizer` will be called by the `StreamsBuilderFactoryBeabn` right before the underlying `KafkaStreams` gets started.
There can only be one `StreamsBuilderFactoryBeanCustomizer` in the entire application.
Then how do we account for multiple Kafka Streams processors as each of them are backed up by `StreamsBuilderFactoryBeabn`.
Then how do we account for multiple Kafka Streams processors as each of them are backed up by individual `StreamsBuilderFactoryBean` objects?
In that case, if the customization needs to be different for those processors, then the application needs to apply some filter based on the application ID.
For e.g,
@@ -1231,10 +1235,10 @@ public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer()
=== Timestamp extractor
Kafka Streams allows you to control the the processing of the consumer records based on various notions of timestamp.
Kafka Streams allows you to control the processing of the consumer records based on various notions of timestamp.
By default, Kafka Streams extracts the timestamp metadata embedded in the consumer record.
You can change this default behavior by providing a different `TimestampExtractor` implementation per input binding.
Here are some details on how to do so.
Here are some details on how that can be done.
```
@Bean
@@ -1293,7 +1297,7 @@ spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
```
Things become a bit more complex if you have the same application as above, but is dealing with two different Kafka clusters, for e.g. the regular process is acting upon both Kafka cluster 1 and cluster 2(receiving data from cluster-1 and sending to cluster-2) and the Kafka Streams processor is acting upon Kafka cluster 2.
Things become a bit more complex if you have the same application as above, but is dealing with two different Kafka clusters, for e.g. the regular `process` is acting upon both Kafka cluster 1 and cluster 2 (receiving data from cluster-1 and sending to cluster-2) and the Kafka Streams processor is acting upon Kafka cluster 2.
Then you have to use the https://cloud.spring.io/spring-cloud-stream/reference/html/spring-cloud-stream.html#multiple-binders[multibinder] facilities provided by Spring Cloud Stream.
Here is how your configuration may change in that scenario.
@@ -1307,7 +1311,6 @@ spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
@@ -1324,7 +1327,7 @@ spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
```
Pay attention to the above configuration.
We have two kinds of binders, but 3 binders all in all, first one is the regular Kafka binder based on cluster 1 (`kafka1`), then another Kafka binder based on cluster 2 (`kafka2`) and finally the kstream on (`kafka3`)
We have two kinds of binders, but 3 binders all in all, first one is the regular Kafka binder based on cluster 1 (`kafka1`), then another Kafka binder based on cluster 2 (`kafka2`) and finally the `kstream` one (`kafka3`).
The first processor in the application receives data from `kafka1` and publishes to `kafka2` where both binders are based on regular Kafka binder but differnt clusters.
The second processor, which is a Kafka Streams processor consumes data from `kafka3` which is the same cluster as `kafka2`, but a different binder type.
@@ -1342,7 +1345,9 @@ public Function<KStream<Long, Order>,
}
```
then, this has to be configured in a multi binder scenario as the following:
then, this has to be configured in a multi binder scenario as the following.
Please note that this is only needed if you have a true multi-binder scenario where there are multiple processors dealing with multiple clusters within a single application.
In that case, the binders need to be explicitly provided with the bindings to distinguish from other processor's binder types and clusters.
```
spring.cloud.stream.binders.kafka1.type: kstream
@@ -1465,17 +1470,3 @@ Default: `earliest`.
Note: Using `resetOffsets` on the consumer does not have any effect on Kafka Streams binder.
Unlike the message channel based binder, Kafka Streams binder does not seek to beginning or end on demand.
=== Accessing the underlying KafkaStreams object
`StreamBuilderFactoryBean` from spring-kafka that is responsible for constructing the `KafkaStreams` object can be accessed programmatically.
Each `StreamBuilderFactoryBean` is registered as `stream-builder` and appended with the `StreamListener` method name.
If your `StreamListener` method is named as `process` for example, the stream builder bean is named as `stream-builder-process`.
Since this is a factory bean, it should be accessed by prepending an ampersand (`&`) when accessing it programmatically.
Following is an example and it assumes the `StreamListener` method is named as `process`
[source]
----
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
----