InteractiveQueryService methods for finding the host info for Kafka Streams
currently throw exceptions if the underlying KafkaStreams are not ready yet.
Introduce a retry mechanism so that the users can control the behaviour of
these methods by providing the following properties.
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts (default 1)
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backoffPeriod (default 1000 ms).
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1185
InteractiveQueryService methods for finding the host info for Kafka Streams
currently throw exceptions if the underlying KafkaStreams are not ready yet.
Introduce a retry mechanism so that the users can control the behaviour of
these methods by providing the following properties.
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts (default 1)
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backoffPeriod (default 1000 ms).
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1185
Allow health checks on KafkaStreams processors that are currently stopped through
actuator bindings endpoint. Add this only as an opt-in feature through a new binder
level property - includeStoppedProcessorsForHealthCheck which is false by default
to preserve the current health indicator behavior.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165Resolves#1175
This PR safe guards state store instances in case there are multiple KafkaStreams
instances present that have distinct application IDs but share State Store Names.
Change is backwards compatible: In case no KafkaStreams association of the thread
can be found, all local state stores are queried as before.
In case an associated KafkaStreams Instance is found, but required StateStore is
not found in this instance, a warning is issued but backwards compatibility is
preserved by looking up all state stores.
Store within KafkaStreams instance of thread is preferred over "foreign" store with same name.
Warning is issued if requested store is not found within KafkaStreams instance of thread.
The main benefit here is to get rid of randomly selecting stores across all KafkaStreams instances
in case a store is contained within multiple streams instances with same name.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1161
Allow health checks on KafkaStreams processors that are currently stopped through
actuator bindings endpoint. Add this only as an opt-in feature through a new binder
level property - includeStoppedProcessorsForHealthCheck which is false by default
to preserve the current health indicator behavior.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165Resolves#1175
This PR safe guards state store instances in case there are multiple KafkaStreams
instances present that have distinct application IDs but share State Store Names.
Change is backwards compatible: In case no KafkaStreams association of the thread
can be found, all local state stores are queried as before.
In case an associated KafkaStreams Instance is found, but required StateStore is
not found in this instance, a warning is issued but backwards compatibility is
preserved by looking up all state stores.
Store within KafkaStreams instance of thread is preferred over "foreign" store with same name.
Warning is issued if requested store is not found within KafkaStreams instance of thread.
The main benefit here is to get rid of randomly selecting stores across all KafkaStreams instances
in case a store is contained within multiple streams instances with same name.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1161
* Disconnect spring-cloud-scheam-registry-client from Kafka Streams binder
that is used for testing
* Deprecate MessageConverterDelegateSerde
* Remove MessageConverterDelegateSerdeTest
When both regular Kafka and Kafka Streams functions are present,
the code that was added recently for function composition in
Kafka Streams binder was accidentally creating a binadable proxy
factory bean for non Kafka Streams functions. Resolving this issue.
Resovles https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1156
Coalesce the stream join tests in Kafka Streams binder around the functional model.
Remove duplicating the same join tests using the StreamListener model.
* Consumer/Producer prefix in Kafka Streams binder
Kafka Streams allows the applications to separate the consumer and producer
properties using consumer/producer prefixes. Add these prefixes automatically
if they are missing from the application.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1065
* Addressing PR review comments
Start Kafka Streams bindgings only when they are not running.
Similarly, stop them only if they are running.
Without these guards in the bindings for KStream, KTable and GlobalKTable,
it may cause NPE's due to the backing concurrent collections in KafkaStreamsRegistry
not finding the proper KafkaStreams object, especially when the StreamsBuilderFactory
bean is already stopped through the binder provided manager.