Export Kafka Streams metrics available through KafkaStreams#metrics into a Micrometer MeterRegistry.
Add documentation for how to access metrics.
Modify test to verify metrics.
Resolves#543
Use DeadLetterPublishingRecoverer from Spring Kafka instead of custom DLQ components in the binder.
Remove code that is no longer needed for DLQ purposes.
In Kafka Streams, always set group to application id if the user doesn't set an explicit group.
Upgrade Spring Kafka to 2.3.0 and SIK to 3.2.0
Resolves#761
If there are multiple functions in a Kafka Streams application, and if they want to have
a separate set of configuration for each, then it should be able to set that at the function
level. For e.g. spring.cloud.stream.kafka.streams.binder.functions....
Resolves#757
In Kafka Streams binder, with native decoding being the default in 3.0 and going forward,
the topology depth of the Kafka Streams processors have much reduced compared to the topology
generated when using non-native decoding. There was still an extra unncessary processor in the
topology even when the deserialization done natively. Addressing that issue.
With this change, the topology generated is equivalent to the native Kafka Streams applications.
Resolves#682
Resolves#744
* Address deprecations in the pollable consumer
* Introduce a property for pollable time out in KafkaConsuerProperties
* Fix tests
* Add docs
* Addressin PR review comments
When StreamListener based processors used in Kafka Streams
applications, it is not possible to register state store beans
using StateStoreBuilder. This is allowed in the functional model.
This method of providing custom state stores is desired as this
gives the user more flexibility in configuring Serde's and other
properties on the state store. Adding this feature to StreamListner
based Kafka Streams processors.
Adding test to verify.
Adding docs.
Resolves#676
On KStream producer binding, topic properties are not applied
by the provisioner. This is because the extended properties object
that is passed to producer binding is erroneously overwritten by a
default instance. Addressing this issue.
Resolves#684
When KTable or GlobalKTable binders are used in a multi bindder
environment, it has difficulty finding certain beans/properties.
Addressing this issue.
Adding tests.
Resolves#681
Fixing an issue that causes a race condition when multiple functions
are present in a Kafka Streams application by isolating the responsible
proxy factory per function and not shared.
When multiple Kafka Streams functions are present in an application,
it should be possible to set the application id per function.
When an application provides a bean of type Serde, then the binder
should try to introspect that bean to see if it can be matched for
any inbound or outbound serialization.
Adding tests to verify the changes.
Adding docs.
Resolves#734, #735, #736
During application startup, there are cases when state stores
might take longer time to initialize. The call to find the
state store in the binder (InteractiveQueryService) will fail in
those situations. Providing a basic retry mechanism using which
applicaitons can opt-in for this retry.
Adding properties for retrying state store at the binder level.
Adding docs.
Polishing.
Resolves#706
There are issues when a bean declared as a function in the Kafka Streams
application tries to autowire a bean through method parameter injection.
Addressing these concerns.
Resolves#726
Use BeanFactoryUtils.beanNamesForTypeIncludingAncestors instead of
getBean from BeanFactory which forces the bean creation inside the
function detector condition. There was a race condition in which
applications were unable to autowire beans and use them in functions
while the detector condition was creating the beans. This change will
delay the creation of the function bean until it is needed.
When using the Kafka Streams binder, if the application chooses to
provide bootstrap server configuration through Kafka binder broker
property, then allow that. This way either type of broker config
works in Kafka Streams binder.
Resolves#401Resolves#720
Generate a random application id for Kafka Streams binder if the user
doesn't set one for the application. This is useful for development
purposes, as it avoids the creation of an explicit application id.
For production workloads, it is highly recommended to explicitly provide
and application id.
The gnerated application id follows a patter where it uses the function bean
name followed by a random UUID string which is followed by the literal appplicaitonId.
In the case of StreamListener, instead of function bean name, it uses the containing class +
StreamListener method name.
If the binder generates the application id, that information will be logged on the console
at startup.
Resolves#718Resolves#719
When joining two input KStreams, the binder throws an exceptin.
Fixing this issue by passing the wrapped target KStream from the
proxy object to the adapted StreamListener method.
Resolves#701
* Handle Deserialization errors when there is a key
Handle DLQ sending on deserialization errors when there is a key in the record.
Resolves#635
* Adding keySerde information in the functional bindings