Files
spring-cloud-stream-binder-…/docs/src/main/asciidoc/kafka-streams.adoc
Spring Operator 1bb61f1cbe URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed But Review Recommended
These URLs were fixed, but the https status was not OK. However, the https status was the same as the http request or http redirected to an https URL, so they were migrated. Your review is recommended.

* [ ] http://compose.docker.io/ (UnknownHostException) with 2 occurrences migrated to:
  https://compose.docker.io/ ([https](https://compose.docker.io/) result UnknownHostException).
* [ ] http://docs.spring.io/spring-cloud-stream-binder-kafka/docs/ (301) with 1 occurrences migrated to:
  https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/ ([https](https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/) result 404).
* [ ] http://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current-SNAPSHOT/reference/html/ (301) with 1 occurrences migrated to:
  https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current-SNAPSHOT/reference/html/ ([https](https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current-SNAPSHOT/reference/html/) result 404).
* [ ] http://docs.spring.io/spring-kafka/reference/html/_reference.html (301) with 1 occurrences migrated to:
  https://docs.spring.io/spring-kafka/reference/html/_reference.html ([https](https://docs.spring.io/spring-kafka/reference/html/_reference.html) result 404).

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://docs.confluent.io/2.0.0/kafka/security.html with 2 occurrences migrated to:
  https://docs.confluent.io/2.0.0/kafka/security.html ([https](https://docs.confluent.io/2.0.0/kafka/security.html) result 200).
* [ ] http://github.com/ with 3 occurrences migrated to:
  https://github.com/ ([https](https://github.com/) result 200).
* [ ] http://kafka.apache.org/090/documentation.html with 4 occurrences migrated to:
  https://kafka.apache.org/090/documentation.html ([https](https://kafka.apache.org/090/documentation.html) result 200).
* [ ] http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html with 2 occurrences migrated to:
  https://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html ([https](https://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html) result 200).
* [ ] http://plugins.jetbrains.com/plugin/6546 with 2 occurrences migrated to:
  https://plugins.jetbrains.com/plugin/6546 ([https](https://plugins.jetbrains.com/plugin/6546) result 301).
* [ ] http://raw.github.com/ with 1 occurrences migrated to:
  https://raw.github.com/ ([https](https://raw.github.com/) result 301).
* [ ] http://eclipse.org with 2 occurrences migrated to:
  https://eclipse.org ([https](https://eclipse.org) result 302).
* [ ] http://eclipse.org/m2e/ with 4 occurrences migrated to:
  https://eclipse.org/m2e/ ([https](https://eclipse.org/m2e/) result 302).
* [ ] http://www.springsource.com/developer/sts with 2 occurrences migrated to:
  https://www.springsource.com/developer/sts ([https](https://www.springsource.com/developer/sts) result 302).
2019-03-26 03:57:52 -05:00

696 lines
26 KiB
Plaintext

== Kafka Streams Binder
=== Usage
For using the Kafka Streams binder, you just need to add it to your Spring Cloud Stream application, using the following
Maven coordinates:
[source,xml]
----
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
----
=== Overview
Spring Cloud Stream's Apache Kafka support also includes a binder implementation designed explicitly for Apache Kafka
Streams binding. With this native integration, a Spring Cloud Stream "processor" application can directly use the
https://kafka.apache.org/documentation/streams/developer-guide[Apache Kafka Streams] APIs in the core business logic.
Kafka Streams binder implementation builds on the foundation provided by the https://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams[Kafka Streams in Spring Kafka]
project.
Kafka Streams binder provides binding capabilities for the three major types in Kafka Streams - KStream, KTable and GlobalKTable.
As part of this native integration, the high-level https://docs.confluent.io/current/streams/developer-guide/dsl-api.html[Streams DSL]
provided by the Kafka Streams API is available for use in the business logic.
An early version of the https://docs.confluent.io/current/streams/developer-guide/processor-api.html[Processor API]
support is available as well.
As noted early-on, Kafka Streams support in Spring Cloud Stream is strictly only available for use in the Processor model.
A model in which the messages read from an inbound topic, business processing can be applied, and the transformed messages
can be written to an outbound topic. It can also be used in Processor applications with a no-outbound destination.
==== Streams DSL
This application consumes data from a Kafka topic (e.g., `words`), computes word count for each unique word in a 5 seconds
time window, and the computed results are sent to a downstream topic (e.g., `counts`) for further processing.
[source]
----
@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<?, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-multi"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
----
Once built as a uber-jar (e.g., `wordcount-processor.jar`), you can run the above example like the following.
[source]
----
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts
----
This application will consume messages from the Kafka topic `words` and the computed results are published to an output
topic `counts`.
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as
KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic
required in the processor. Setting up the Streams DSL specific configuration required by the Kafka Streams infrastructure
is automatically handled by the framework.
=== Configuration Options
This section contains the configuration options used by the Kafka Streams binder.
For common configuration options and properties pertaining to binder, refer to the <<binding-properties,core documentation>>.
==== Kafka Streams Properties
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.streams.binder.`
configuration::
Map with a key/value pair containing properties pertaining to Apache Kafka Streams API.
This property must be prefixed with `spring.cloud.stream.kafka.streams.binder.`.
Following are some examples of using this property.
[source]
----
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
----
For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in
Apache Kafka Streams docs.
brokers::
Broker URL
+
Default: `localhost`
zkNodes::
Zookeeper URL
+
Default: `localhost`
serdeError::
Deserialization error handler type.
Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq`
+
Default: `logAndFail`
applicationId::
Convenient way to set the application.id for the Kafka Streams application globally at the binder level.
If the application contains multiple `StreamListener` methods, then application.id should be set at the binding level per input binding.
+
Default: `none`
The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`
For convenience, if there multiple output bindings and they all require a common value, that can be configured by using the prefix `spring.cloud.stream.kafka.streams.default.producer.`.
keySerde::
key serde to use
+
Default: `none`.
valueSerde::
value serde to use
+
Default: `none`.
useNativeEncoding::
flag to enable native encoding
+
Default: `false`.
The following properties are available for Kafka Streams consumers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.`
For convenience, if there are multiple input bindings and they all require a common value, that can be configured by using the prefix `spring.cloud.stream.kafka.streams.default.consumer.`.
applicationId::
Setting application.id per input binding.
+
Default: `none`
keySerde::
key serde to use
+
Default: `none`.
valueSerde::
value serde to use
+
Default: `none`.
materializedAs::
state store to materialize when using incoming KTable types
+
Default: `none`.
useNativeDecoding::
flag to enable native decoding
+
Default: `false`.
dlqName::
DLQ topic name.
+
Default: `none`.
startOffset::
Offset to start from if there is no committed offset to consume from.
This is mostly used when the consumer is consuming from a topic for the first time. Kafka Streams uses `earliest` as the default strategy and
the binder uses the same default. This can be overridden to `latest` using this property.
+
Default: `earliest`.
Note: Using `resetOffsets` on the consumer does not have any effect on Kafka Streams binder.
Unlike the message channel based binder, Kafka Streams binder does not seek to beginning or end on demand.
==== TimeWindow properties:
Windowing is an important concept in stream processing applications. Following properties are available to configure
time-window computations.
spring.cloud.stream.kafka.streams.timeWindow.length::
When this property is given, you can autowire a `TimeWindows` bean into the application.
The value is expressed in milliseconds.
+
Default: `none`.
spring.cloud.stream.kafka.streams.timeWindow.advanceBy::
Value is given in milliseconds.
+
Default: `none`.
=== Multiple Input Bindings
For use cases that requires multiple incoming KStream objects or a combination of KStream and KTable objects, the Kafka
Streams binder provides multiple bindings support.
Let's see it in action.
==== Multiple Input Bindings as a Sink
[source]
----
@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
@Input("inputTable") KTable<Long, Song> songTable) {
....
....
}
interface KStreamKTableBinding {
@Input("inputStream")
KStream<?, ?> inputStream();
@Input("inputTable")
KTable<?, ?> inputTable();
}
----
In the above example, the application is written as a sink, i.e. there are no output bindings and the application has to
decide concerning downstream processing. When you write applications in this style, you might want to send the information
downstream or store them in a state store (See below for Queryable State Stores).
In the case of incoming KTable, if you want to materialize the computations to a state store, you have to express it
through the following property.
[source]
----
spring.cloud.stream.kafka.streams.bindings.inputTable.consumer.materializedAs: all-songs
----
The above example shows the use of KTable as an input binding.
The binder also supports input bindings for GlobalKTable.
GlobalKTable binding is useful when you have to ensure that all instances of your application has access to the data updates from the topic.
KTable and GlobalKTable bindings are only available on the input.
Binder supports both input and output bindings for KStream.
=== Multiple Input Bindings as a Processor
[source]
----
@EnableBinding(KStreamKTableBinding.class)
....
....
@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
@Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}
interface KStreamKTableBinding extends KafkaStreamsProcessor {
@Input("inputX")
KTable<?, ?> inputTable();
}
----
=== Multiple Output Bindings (aka Branching)
Kafka Streams allow outbound data to be split into multiple topics based on some predicates. The Kafka Streams binder provides
support for this feature without compromising the programming model exposed through `StreamListener` in the end user application.
You can write the application in the usual way as demonstrated above in the word count example. However, when using the
branching feature, you are required to do a few things. First, you need to make sure that your return type is `KStream[]`
instead of a regular `KStream`. Second, you need to use the `SendTo` annotation containing the output bindings in the order
(see example below). For each of these output bindings, you need to configure destination, content-type etc., complying with
the standard Spring Cloud Stream expectations.
Here is an example:
[source]
----
@EnableBinding(KStreamProcessorWithBranches.class)
@EnableAutoConfiguration
public static class WordCountProcessorApplication {
@Autowired
private TimeWindows timeWindows;
@StreamListener("input")
@SendTo({"output1","output2","output3})
public KStream<?, WordCount>[] process(KStream<Object, String> input) {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(timeWindows)
.count(Materialized.as("WordCounts-1"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
interface KStreamProcessorWithBranches {
@Input("input")
KStream<?, ?> input();
@Output("output1")
KStream<?, ?> output1();
@Output("output2")
KStream<?, ?> output2();
@Output("output3")
KStream<?, ?> output3();
}
}
----
Properties:
[source]
----
spring.cloud.stream.bindings.output1.contentType: application/json
spring.cloud.stream.bindings.output2.contentType: application/json
spring.cloud.stream.bindings.output3.contentType: application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams.binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output1:
destination: foo
producer:
headerMode: raw
spring.cloud.stream.bindings.output2:
destination: bar
producer:
headerMode: raw
spring.cloud.stream.bindings.output3:
destination: fox
producer:
headerMode: raw
spring.cloud.stream.bindings.input:
destination: words
consumer:
headerMode: raw
----
=== Record Value Conversion
Kafka Streams binder can marshal producer/consumer values based on a content type and the converters provided out of the box in Spring Cloud Stream.
It is typical for Kafka Streams applications to provide `Serde` classes.
Therefore, it may be more natural to rely on the SerDe facilities provided by the Apache Kafka Streams library itself for data conversion on inbound and outbound
rather than rely on the content-type conversions offered by the binder.
On the other hand, you might be already familiar with the content-type conversion patterns provided by Spring Cloud Stream and
would like to continue using that for inbound and outbound conversions.
Both the options are supported in the Kafka Streams binder implementation. See below for more details.
===== Outbound serialization
If native encoding is disabled (which is the default), then the framework will convert the message using the contentType
set by the user (otherwise, the default `application/json` will be applied). It will ignore any SerDe set on the outbound
in this case for outbound serialization.
Here is the property to set the contentType on the outbound.
[source]
----
spring.cloud.stream.bindings.output.contentType: application/json
----
Here is the property to enable native encoding.
[source]
----
spring.cloud.stream.bindings.output.nativeEncoding: true
----
If native encoding is enabled on the output binding (user has to enable it as above explicitly), then the framework will
skip any form of automatic message conversion on the outbound. In that case, it will switch to the Serde set by the user.
The `valueSerde` property set on the actual output binding will be used. Here is an example.
[source]
----
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
----
If this property is not set, then it will use the "default" SerDe: `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
It is worth to mention that Kafka Streams binder does not serialize the keys on outbound - it simply relies on Kafka itself.
Therefore, you either have to specify the `keySerde` property on the binding or it will default to the application-wide common
`keySerde`.
Binding level key serde:
[source]
----
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde
----
Common Key serde:
[source]
----
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
----
If branching is used, then you need to use multiple output bindings. For example,
[source]
----
interface KStreamProcessorWithBranches {
@Input("input")
KStream<?, ?> input();
@Output("output1")
KStream<?, ?> output1();
@Output("output2")
KStream<?, ?> output2();
@Output("output3")
KStream<?, ?> output3();
}
----
If `nativeEncoding` is set, then you can set different SerDe's on individual output bindings as below.
[source]
----
spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde
----
Then if you have `SendTo` like this, @SendTo({"output1", "output2", "output3"}), the `KStream[]` from the branches are
applied with proper SerDe objects as defined above. If you are not enabling `nativeEncoding`, you can then set different
contentType values on the output bindings as below. In that case, the framework will use the appropriate message converter
to convert the messages before sending to Kafka.
[source]
----
spring.cloud.stream.bindings.output1.contentType: application/json
spring.cloud.stream.bindings.output2.contentType: application/java-serialzied-object
spring.cloud.stream.bindings.output3.contentType: application/octet-stream
----
===== Inbound Deserialization
Similar rules apply to data deserialization on the inbound.
If native decoding is disabled (which is the default), then the framework will convert the message using the contentType
set by the user (otherwise, the default `application/json` will be applied). It will ignore any SerDe set on the inbound
in this case for inbound deserialization.
Here is the property to set the contentType on the inbound.
[source]
----
spring.cloud.stream.bindings.input.contentType: application/json
----
Here is the property to enable native decoding.
[source]
----
spring.cloud.stream.bindings.input.nativeDecoding: true
----
If native decoding is enabled on the input binding (user has to enable it as above explicitly), then the framework will
skip doing any message conversion on the inbound. In that case, it will switch to the SerDe set by the user. The `valueSerde`
property set on the actual output binding will be used. Here is an example.
[source]
----
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
----
If this property is not set, it will use the default SerDe: `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
It is worth to mention that Kafka Streams binder does not deserialize the keys on inbound - it simply relies on Kafka itself.
Therefore, you either have to specify the `keySerde` property on the binding or it will default to the application-wide common
`keySerde`.
Binding level key serde:
[source]
----
spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde
----
Common Key serde:
[source]
----
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
----
As in the case of KStream branching on the outbound, the benefit of setting value SerDe per binding is that if you have
multiple input bindings (multiple KStreams object) and they all require separate value SerDe's, then you can configure
them individually. If you use the common configuration approach, then this feature won't be applicable.
=== Error Handling
Apache Kafka Streams provide the capability for natively handling exceptions from deserialization errors.
For details on this support, please see https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers[this]
Out of the box, Apache Kafka Streams provide two kinds of deserialization exception handlers - `logAndContinue` and `logAndFail`.
As the name indicates, the former will log the error and continue processing the next records and the latter will log the
error and fail. `LogAndFail` is the default deserialization exception handler.
=== Handling Deserialization Exceptions
Kafka Streams binder supports a selection of exception handlers through the following properties.
[source]
----
spring.cloud.stream.kafka.streams.binder.serdeError: logAndContinue
----
In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous
records (poison pills) to a DLQ topic. Here is how you enable this DLQ exception handler.
[source]
----
spring.cloud.stream.kafka.streams.binder.serdeError: sendToDlq
----
When the above property is set, all the deserialization error records are automatically sent to the DLQ topic.
[source]
----
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: foo-dlq
----
If this is set, then the error records are sent to the topic `foo-dlq`. If this is not set, then it will create a DLQ
topic with the name `error.<input-topic-name>.<group-name>`.
A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder.
* The property `spring.cloud.stream.kafka.streams.binder.serdeError` is applicable for the entire application. This implies
that if there are multiple `StreamListener` methods in the same application, this property is applied to all of them.
* The exception handling for deserialization works consistently with native deserialization and framework provided message
conversion.
==== Handling Non-Deserialization Exceptions
For general error handling in Kafka Streams binder, it is up to the end user applications to handle application level errors.
As a side effect of providing a DLQ for deserialization exception handlers, Kafka Streams binder provides a way to get
access to the DLQ sending bean directly from your application.
Once you get access to that bean, you can programmatically send any exception records from your application to the DLQ.
It continues to remain hard to robust error handling using the high-level DSL; Kafka Streams doesn't natively support error
handling yet.
However, when you use the low-level Processor API in your application, there are options to control this behavior. See
below.
[source]
----
@Autowired
private SendToDlqAndContinue dlqHandler;
@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {
input.process(() -> new Processor() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object o, Object o2) {
try {
.....
.....
}
catch(Exception e) {
//explicitly provide the kafka topic corresponding to the input binding as the first argument.
//DLQ handler will correctly map to the dlq topic from the actual incoming destination.
dlqHandler.sendToDlq("topic-name", (byte[]) o1, (byte[]) o2, context.partition());
}
}
.....
.....
});
}
----
=== State Store
State store is created automatically by Kafka Streams when the DSL is used.
When processor API is used, you need to register a state store manually. In order to do so, you can use `KafkaStreamsStateStore` annotation.
You can specify the name and type of the store, flags to control log and disabling cache, etc.
Once the store is created by the binder during the bootstrapping phase, you can access this state store through the processor API.
Below are some primitives for doing this.
Creating a state store:
[source]
----
@KafkaStreamsStateStore(name="mystate", type= KafkaStreamsStateStoreProperties.StoreType.WINDOW, lengthMs=300000)
public void process(KStream<Object, Product> input) {
...
}
----
Accessing the state store:
[source]
----
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
----
=== Interactive Queries
As part of the public Kafka Streams binder API, we expose a class called `InteractiveQueryService`.
You can access this as a Spring bean in your application. An easy way to get access to this bean from your application is to "autowire" the bean.
[source]
----
@Autowired
private InteractiveQueryService interactiveQueryService;
----
Once you gain access to this bean, then you can query for the particular state-store that you are interested. See below.
[source]
----
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
----
If there are multiple instances of the kafka streams application running, then before you can query them interactively, you need to identify which application instance hosts the key.
`InteractiveQueryService` API provides methods for identifying the host information.
In order for this to work, you must configure the property `application.server` as below:
[source]
----
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
----
Here are some code snippets:
[source]
----
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
----
=== Accessing the underlying KafkaStreams object
`StreamBuilderFactoryBean` from spring-kafka that is responsible for constructing the `KafkaStreams` object can be accessed programmatically.
Each `StreamBuilderFactoryBean` is registered as `stream-builder` and appended with the `StreamListener` method name.
If your `StreamListener` method is named as `process` for example, the stream builder bean is named as `stream-builder-process`.
Since this is a factory bean, it should be accessed by prepending an ampersand (`&`) when accessing it programmatically.
Following is an example and it assumes the `StreamListener` method is named as `process`
[source]
----
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
----
=== State Cleanup
By default, the `Kafkastreams.cleanup()` method is called when the binding is stopped.
See https://docs.spring.io/spring-kafka/reference/html/_reference.html#_configuration[the Spring Kafka documentation].
To modify this behavior simply add a single `CleanupConfig` `@Bean` (configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.