Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
090acc408f | ||
|
|
922f735306 | ||
|
|
735958e7ec | ||
|
|
2851464bd0 | ||
|
|
da049fc980 | ||
|
|
170166ac57 | ||
|
|
5b880a8104 | ||
|
|
42d3b92c7b |
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.2-SNAPSHOT</version>
|
||||
<version>3.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<packaging>jar</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
|
||||
@@ -826,7 +826,7 @@ It will use that for inbound deserialization.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Serde<Foo() customSerde{
|
||||
public Serde<Foo> customSerde() {
|
||||
...
|
||||
}
|
||||
|
||||
|
||||
@@ -660,41 +660,10 @@ See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/m
|
||||
===== Example: Pausing and Resuming the Consumer
|
||||
|
||||
If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
|
||||
This is facilitated by adding the `Consumer` as a parameter to your `@StreamListener`.
|
||||
To resume, you need an `ApplicationListener` for `ListenerContainerIdleEvent` instances.
|
||||
This is facilitated by managing the binding lifecycle as shown in **Binding visualization and control** in the Spring Cloud Stream documentation, using `State.PAUSED` and `State.RESUMED`.
|
||||
|
||||
To resume, you can use an `ApplicationListener` (or `@EventListener` method) to receive `ListenerContainerIdleEvent` instances.
|
||||
The frequency at which events are published is controlled by the `idleEventInterval` property.
|
||||
Since the consumer is not thread-safe, you must call these methods on the calling thread.
|
||||
|
||||
The following simple application shows how to pause and resume:
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Sink.class)
|
||||
public class Application {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(Application.class, args);
|
||||
}
|
||||
|
||||
@StreamListener(Sink.INPUT)
|
||||
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
|
||||
System.out.println(in);
|
||||
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
|
||||
return event -> {
|
||||
System.out.println(event);
|
||||
if (event.getConsumer().paused().size() > 0) {
|
||||
event.getConsumer().resume(event.getConsumer().paused());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
[[kafka-transactional-binder]]
|
||||
=== Transactional Binder
|
||||
@@ -995,4 +964,3 @@ public KafkaBinderHealth kafkaBinderHealthIndicator() {
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
10
pom.xml
10
pom.xml
@@ -2,12 +2,12 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.2-SNAPSHOT</version>
|
||||
<version>3.2.3-SNAPSHOT</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>3.1.0</version>
|
||||
<version>3.1.1</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<scm>
|
||||
@@ -21,10 +21,10 @@
|
||||
</scm>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-kafka.version>2.8.0</spring-kafka.version>
|
||||
<spring-integration-kafka.version>5.5.5</spring-integration-kafka.version>
|
||||
<spring-kafka.version>2.8.4-SNAPSHOT</spring-kafka.version>
|
||||
<spring-integration-kafka.version>5.5.8</spring-integration-kafka.version>
|
||||
<kafka.version>3.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>3.2.2-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-stream.version>3.2.3-SNAPSHOT</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.2-SNAPSHOT</version>
|
||||
<version>3.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.2-SNAPSHOT</version>
|
||||
<version>3.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.2-SNAPSHOT</version>
|
||||
<version>3.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -96,8 +96,10 @@ public class StreamsBuilderFactoryManager implements SmartLifecycle {
|
||||
// By default, we shut down the client if there is an uncaught exception in the application.
|
||||
// Users can override this by customizing SBFB. See this issue for more details:
|
||||
// https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1110
|
||||
streamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler(exception ->
|
||||
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
|
||||
if (streamsBuilderFactoryBean.getStreamsUncaughtExceptionHandler() == null) {
|
||||
streamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler(exception ->
|
||||
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
|
||||
}
|
||||
// Starting the stream.
|
||||
final Map<StreamsBuilderFactoryBean, List<ConsumerProperties>> bindingServicePropertiesPerSbfb =
|
||||
this.kafkaStreamsBindingInformationCatalogue.getConsumerPropertiesPerSbfb();
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.2-SNAPSHOT</version>
|
||||
<version>3.2.3-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.integration2;
|
||||
package org.springframework.cloud.stream.binder.kafka.bootstrap;
|
||||
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
Reference in New Issue
Block a user