Compare commits

...

8 Commits

Author SHA1 Message Date
Soby Chacko
090acc408f Kafka Streams binder uncaught exception changes
The default StreamsUncaughtExceptionHandler is added only if the
application does not provide one.
2022-03-03 14:38:38 -05:00
buildmaster
922f735306 Bumping versions to 3.2.3-SNAPSHOT after release 2022-02-16 18:31:22 +00:00
buildmaster
735958e7ec Going back to snapshots 2022-02-16 18:31:21 +00:00
buildmaster
2851464bd0 Update SNAPSHOT to 3.2.2 2022-02-16 18:29:43 +00:00
Soby Chacko
da049fc980 Update Spring Kafka/SIK versions 2022-02-16 10:08:33 -05:00
Gary Russell
170166ac57 GH-1195: Fix Pause/Resume Documentation
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1195

Remove obsolete documentation.

**cherry-pick to 3.2.x**
2022-02-07 14:33:45 -05:00
Rex Ijiekhuamen
5b880a8104 Fixed invalid java code snippet 2022-01-24 10:23:17 -05:00
Soby Chacko
42d3b92c7b Test package changes 2022-01-18 15:52:18 -05:00
10 changed files with 19 additions and 49 deletions

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.2.2-SNAPSHOT</version>
<version>3.2.3-SNAPSHOT</version>
</parent>
<packaging>jar</packaging>
<name>spring-cloud-stream-binder-kafka-docs</name>

View File

@@ -826,7 +826,7 @@ It will use that for inbound deserialization.
```
@Bean
public Serde<Foo() customSerde{
public Serde<Foo> customSerde() {
...
}

View File

@@ -660,41 +660,10 @@ See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/m
===== Example: Pausing and Resuming the Consumer
If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
This is facilitated by adding the `Consumer` as a parameter to your `@StreamListener`.
To resume, you need an `ApplicationListener` for `ListenerContainerIdleEvent` instances.
This is facilitated by managing the binding lifecycle as shown in **Binding visualization and control** in the Spring Cloud Stream documentation, using `State.PAUSED` and `State.RESUMED`.
To resume, you can use an `ApplicationListener` (or `@EventListener` method) to receive `ListenerContainerIdleEvent` instances.
The frequency at which events are published is controlled by the `idleEventInterval` property.
Since the consumer is not thread-safe, you must call these methods on the calling thread.
The following simple application shows how to pause and resume:
[source, java]
----
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
----
[[kafka-transactional-binder]]
=== Transactional Binder
@@ -995,4 +964,3 @@ public KafkaBinderHealth kafkaBinderHealthIndicator() {
};
}
```

10
pom.xml
View File

@@ -2,12 +2,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.2.2-SNAPSHOT</version>
<version>3.2.3-SNAPSHOT</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>3.1.0</version>
<version>3.1.1</version>
<relativePath />
</parent>
<scm>
@@ -21,10 +21,10 @@
</scm>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.8.0</spring-kafka.version>
<spring-integration-kafka.version>5.5.5</spring-integration-kafka.version>
<spring-kafka.version>2.8.4-SNAPSHOT</spring-kafka.version>
<spring-integration-kafka.version>5.5.8</spring-integration-kafka.version>
<kafka.version>3.0.0</kafka.version>
<spring-cloud-stream.version>3.2.2-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-stream.version>3.2.3-SNAPSHOT</spring-cloud-stream.version>
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.2.2-SNAPSHOT</version>
<version>3.2.3-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.2.2-SNAPSHOT</version>
<version>3.2.3-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.2.2-SNAPSHOT</version>
<version>3.2.3-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -96,8 +96,10 @@ public class StreamsBuilderFactoryManager implements SmartLifecycle {
// By default, we shut down the client if there is an uncaught exception in the application.
// Users can override this by customizing SBFB. See this issue for more details:
// https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1110
streamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler(exception ->
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
if (streamsBuilderFactoryBean.getStreamsUncaughtExceptionHandler() == null) {
streamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler(exception ->
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT);
}
// Starting the stream.
final Map<StreamsBuilderFactoryBean, List<ConsumerProperties>> bindingServicePropertiesPerSbfb =
this.kafkaStreamsBindingInformationCatalogue.getConsumerPropertiesPerSbfb();

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.2.2-SNAPSHOT</version>
<version>3.2.3-SNAPSHOT</version>
</parent>
<dependencies>

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.integration2;
package org.springframework.cloud.stream.binder.kafka.bootstrap;
import org.junit.ClassRule;
import org.junit.Test;