diff --git a/docs/src/main/asciidoc/kafka-streams.adoc b/docs/src/main/asciidoc/kafka-streams.adoc index d46083eb..ae272997 100644 --- a/docs/src/main/asciidoc/kafka-streams.adoc +++ b/docs/src/main/asciidoc/kafka-streams.adoc @@ -642,7 +642,7 @@ and After that, you must set all the binding level properties on these new binding names. -Please keep in mind that with the functional programming model described above, sticking with the default binding names make sense in most situations. +Please keep in mind that with the functional programming model described above, adhering to the default binding names make sense in most situations. The only reason you may still want to do this overriding is when you have larger number of configuration properties and you want to map the bindings to something more domain friendly. ==== Setting up bootstrap server configuration @@ -665,7 +665,7 @@ Lets look at some details. ==== Inbound deserialization -Keys are always deserialized by native Serdes. +Keys are always deserialized using native Serdes. For values, by default, deserialization on the inbound is natively performed by Kafka. Please note that this is a major change on default behavior from previous versions of Kafka Streams binder where the deserialization was done by the framework. @@ -891,6 +891,11 @@ that if there are multiple functions or `StreamListener` methods in the same app * The exception handling for deserialization works consistently with native deserialization and framework provided message conversion. +==== Handling Production Exceptions in the Binder + +Unlike the support for deserialization exception handlers as described above, the binder does not provide such first class mechanisms for handling production exceptions. +However, you still can configure production exception handlers using the `StreamsBuilderFactoryBean` customizer which you can find more details about, in a subsequent section below. + === State Store State stores are created automatically by Kafka Streams when the high level DSL is used and appropriate calls are made those trigger a state store. @@ -1080,7 +1085,7 @@ Kafka Streams also gives access to a low level Processor API. The processor API, although very powerful and gives the ability to control things in a much lower level, is imperative in nature. Kafk Streams binder for Spring Cloud Stream, allows you to use either the high level DSL or mixing both the DSL and the processor API. Mixing both of these variants give you a lot of options to control various use cases in an application. -Applications can use the `trasform` or `process` method API calls to get access to the processor API. +Applications can use the `transform` or `process` method API calls to get access to the processor API. Here is a look at how one may combine both the DSL and the processor API in a Spring Cloud Stream application using the `process` API. @@ -1284,6 +1289,24 @@ public StreamsBuilderFactoryBeanCustomizer customizer() { Again, if you have multiple processors, you want to attach the global state store to the right `StreamsBuilder` by filtering out the other `StreamsBuilderFactoryBean` objects using the application id as outlined above. +==== Using customizer to register a production exception handler + +In the error handling section, we indicated that the binder does not provide a first class way to deal with production exceptions. +Though that is the case, you can still use the `StreamsBuilderFacotryBean` customizer to register production exception handlers. See below. + +``` +@Bean +public StreamsBuilderFactoryBeanCustomizer customizer() { + return fb -> { + fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, + CustomProductionExceptionHandler.class); + }; +} +``` + +Once again, if you have multiple processors, you may want to set it appropriately against the correct `StreamsBuilderFactoryBean`. +You may also add such production exception handlers using the configuration property (See below for more on that), but this is an option if you choose to go with a programmatic approach. + === Timestamp extractor Kafka Streams allows you to control the processing of the consumer records based on various notions of timestamp. @@ -1349,7 +1372,7 @@ spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar ``` Things become a bit more complex if you have the same application as above, but is dealing with two different Kafka clusters, for e.g. the regular `process` is acting upon both Kafka cluster 1 and cluster 2 (receiving data from cluster-1 and sending to cluster-2) and the Kafka Streams processor is acting upon Kafka cluster 2. -Then you have to use the https://cloud.spring.io/spring-cloud-stream/reference/html/spring-cloud-stream.html#multiple-binders[multibinder] facilities provided by Spring Cloud Stream. +Then you have to use the https://cloud.spring.io/spring-cloud-stream/reference/html/spring-cloud-stream.html#multiple-binders[multi binder] facilities provided by Spring Cloud Stream. Here is how your configuration may change in that scenario. @@ -1417,7 +1440,6 @@ spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream ``` - === State Cleanup By default, the `Kafkastreams.cleanup()` method is called when the binding is stopped. @@ -1430,7 +1452,7 @@ This section contains the configuration options used by the Kafka Streams binder For common configuration options and properties pertaining to binder, refer to the <>. -==== Kafka Streams Properties +==== Kafka Streams Binder Properties The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.streams.binder.` @@ -1446,8 +1468,23 @@ spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.a spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000 ---- -For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in -Apache Kafka Streams docs. +For more information about all the properties that may go into streams configuration, see `StreamsConfig` JavaDocs in Apache Kafka Streams docs. +All configuration that you can set from `StreamsConfig` can be set through this. +When using this property, it is applicable against the entire application since this is a binder level property. +If you have more than processors in the application, all of them will acquire these properties. +In the case of properties like `application.id`, this will become problematic and therefore you have to carefully examine how the properties from `StreamsConfig` are mapped using this binder level `configuration` property. + +functions..applicationId:: + Applicable only for functional style processors. + This can be used for setting application ID per function in the application. + In the case of multiple functions, this is a handy way to set the application ID. + +functions..configuration:: + Applicable only for functional style processors. + Map with a key/value pair containing properties pertaining to Apache Kafka Streams API. + This is similar to the binder level `configuration` property describe above, but this level of `configuration` property is restricted only against the named function. + When you have multiple processors and you want to restrict access to the configuration based on particular functions, you might want to use this. + All `StreamsConfig` properties can be used here. brokers:: Broker URL @@ -1457,67 +1494,122 @@ zkNodes:: Zookeeper URL + Default: `localhost` -serdeError:: + +deserializationExceptionHandler:: Deserialization error handler type. + This handler is applied at the binder level and thus applied against all input binding in the application. + There is a way to control it in a more fine-grained way at the consumer binding level. Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq` + Default: `logAndFail` + applicationId:: Convenient way to set the application.id for the Kafka Streams application globally at the binder level. - If the application contains multiple functions or `StreamListener` methods, then the application id should be set at the binding level per input binding. + If the application contains multiple functions or `StreamListener` methods, then the application id should be set differently. See above where setting the application id is discussed in detail. + -Default: `none` +Default: application will generate a static application ID. See the application ID section for more details. + +stateStoreRetry.maxAttempts:: + Max attempts for trying to connect to a state store. ++ +Default: 1 + +stateStoreRetry.backoffPeriod:: + Backoff period when trying to connect to a state store on a retry. ++ +Default: 1000 ms + +==== Kafka Streams Producer Properties The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings..producer.` -For convenience, if there multiple output bindings and they all require a common value, that can be configured by using the prefix `spring.cloud.stream.kafka.streams.default.producer.`. +For convenience, if there are multiple output bindings and they all require a common value, that can be configured by using the prefix `spring.cloud.stream.kafka.streams.default.producer.`. keySerde:: key serde to use + Default: See the above discussion on message de/serialization + valueSerde:: value serde to use + Default: See the above discussion on message de/serialization + useNativeEncoding:: flag to enable/disable native encoding + Default: `true`. +streamPartitionerBeanName: + Custom outbound partitioner bean name to be used at the consumer. + Applications can provide custom `StreamPartitioner` as a Spring bean and the name of this bean can be provided to the producer to use instead of the default one. ++ +Default: See the discussion above on outbound partition support. + +==== Kafka Streams Consumer Properties + The following properties are available for Kafka Streams consumers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings..consumer.` For convenience, if there are multiple input bindings and they all require a common value, that can be configured by using the prefix `spring.cloud.stream.kafka.streams.default.consumer.`. applicationId:: - Setting application.id per input binding. + Setting application.id per input binding. This is only preferred for `StreamListener` based processors, for function based processors see other approaches outlined above. + -Default: `none` +Default: See above. + keySerde:: key serde to use + Default: See the above discussion on message de/serialization + valueSerde:: value serde to use + Default: See the above discussion on message de/serialization + materializedAs:: state store to materialize when using incoming KTable types + Default: `none`. + useNativeDecoding:: flag to enable/disable native decoding + Default: `true`. + dlqName:: DLQ topic name. + -Default: `none`. +Default: See above on the discussion of error handling and DLQ. + startOffset:: Offset to start from if there is no committed offset to consume from. - This is mostly used when the consumer is consuming from a topic for the first time. Kafka Streams uses `earliest` as the default strategy and - the binder uses the same default. This can be overridden to `latest` using this property. + This is mostly used when the consumer is consuming from a topic for the first time. + Kafka Streams uses `earliest` as the default strategy and the binder uses the same default. + This can be overridden to `latest` using this property. + Default: `earliest`. Note: Using `resetOffsets` on the consumer does not have any effect on Kafka Streams binder. Unlike the message channel based binder, Kafka Streams binder does not seek to beginning or end on demand. + +deserializationExceptionHandler:: + Deserialization error handler type. + This handler is applied per consumer binding as opposed to the binder level property described before. + Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq` ++ +Default: `logAndFail` + +timestampExtractorBeanName:: + Specific time stamp extractor bean name to be used at the consumer. + Applications can provide `TimestampExtractor` as a Spring bean and the name of this bean can be provided to the consumer to use instead of the default one. ++ +Default: See the discussion above on timestamp extractors. + +==== Special note on concurrency + +In Kafka Streams, you can control of the number of threads a processor can create using the `num.stream.threads` property. +This, you can do using the various `configuration` options described above under binder, functions, producer or consumer level. +You can also use the `concurrency` property that core Spring Cloud Stream provides for this purpose. +When using this, you need to use it on the consumer. +When you have more than one input bindings either in a function or `StreamListener`, set this on the first input binding. +For e.g. when setting `spring.cloud.stream.bindings.process-in-0.consumer.concurrency`, it will be transalted as `num.stream.threads` by the binder.