Currently, KafkaBinderHealthIndicator is not customizable and included by default
when Spring Boot actuator is on the classpath. Fix this by allowing the application
to provide a custom implementation. A new marker interface called KafkaBinderHealth
can be used by the applicaiton to provide a custom HealthIndicator implementation, in
which case, the binder's default implementation will be excluded.
Tests and docs changes.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1180
InteractiveQueryService methods for finding the host info for Kafka Streams
currently throw exceptions if the underlying KafkaStreams are not ready yet.
Introduce a retry mechanism so that the users can control the behaviour of
these methods by providing the following properties.
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts (default 1)
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backoffPeriod (default 1000 ms).
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1185
Allow health checks on KafkaStreams processors that are currently stopped through
actuator bindings endpoint. Add this only as an opt-in feature through a new binder
level property - includeStoppedProcessorsForHealthCheck which is false by default
to preserve the current health indicator behavior.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165Resolves#1175
This PR safe guards state store instances in case there are multiple KafkaStreams
instances present that have distinct application IDs but share State Store Names.
Change is backwards compatible: In case no KafkaStreams association of the thread
can be found, all local state stores are queried as before.
In case an associated KafkaStreams Instance is found, but required StateStore is
not found in this instance, a warning is issued but backwards compatibility is
preserved by looking up all state stores.
Store within KafkaStreams instance of thread is preferred over "foreign" store with same name.
Warning is issued if requested store is not found within KafkaStreams instance of thread.
The main benefit here is to get rid of randomly selecting stores across all KafkaStreams instances
in case a store is contained within multiple streams instances with same name.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1161
title: Please create new issues in https://github.com/spring-cloud/spring-cloud-stream/issues
labels: ''
assignees: ''
---
Please create all new issues in https://github.com/spring-cloud/spring-cloud-stream/issues. The Kafka binder repository has been relocated to the core Spring Cloud Stream repo.
@@ -40,7 +40,7 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka
The consumer group maps directly to the same Apache Kafka concept.
Partitioning also maps directly to Apache Kafka partitions as well.
The binder currently uses the Apache Kafka `kafka-clients` version `2.3.1`.
The binder currently uses the Apache Kafka `kafka-clients` version `3.1.0`.
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
For example, with versions earlier than 0.11.x.x, native headers are not supported.
Also, 0.11.x.x does not support the `autoAddPartitions` property.
@@ -658,41 +658,10 @@ See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/m
===== Example: Pausing and Resuming the Consumer
If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
This is facilitated by adding the `Consumer` as a parameter to your `@StreamListener`.
To resume, you need an `ApplicationListener` for `ListenerContainerIdleEvent` instances.
This is facilitated by managing the binding lifecycle as shown in **Binding visualization and control** in the Spring Cloud Stream documentation, using `State.PAUSED` and `State.RESUMED`.
To resume, you can use an `ApplicationListener` (or `@EventListener` method) to receive `ListenerContainerIdleEvent` instances.
The frequency at which events are published is controlled by the `idleEventInterval` property.
Since the consumer is not thread-safe, you must call these methods on the calling thread.
The following simple application shows how to pause and resume:
[source, java]
----
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
Blocking a user prevents them from interacting with repositories, such as opening or commenting on pull requests or issues. Learn more about blocking a user.