GH-1195: Fix Pause/Resume Documentation
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1195 Remove obsolete documentation. **cherry-pick to 3.2.x**
This commit is contained in:
@@ -658,41 +658,10 @@ See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/m
|
|||||||
===== Example: Pausing and Resuming the Consumer
|
===== Example: Pausing and Resuming the Consumer
|
||||||
|
|
||||||
If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
|
If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
|
||||||
This is facilitated by adding the `Consumer` as a parameter to your `@StreamListener`.
|
This is facilitated by managing the binding lifecycle as shown in **Binding visualization and control** in the Spring Cloud Stream documentation, using `State.PAUSED` and `State.RESUMED`.
|
||||||
To resume, you need an `ApplicationListener` for `ListenerContainerIdleEvent` instances.
|
|
||||||
|
To resume, you can use an `ApplicationListener` (or `@EventListener` method) to receive `ListenerContainerIdleEvent` instances.
|
||||||
The frequency at which events are published is controlled by the `idleEventInterval` property.
|
The frequency at which events are published is controlled by the `idleEventInterval` property.
|
||||||
Since the consumer is not thread-safe, you must call these methods on the calling thread.
|
|
||||||
|
|
||||||
The following simple application shows how to pause and resume:
|
|
||||||
|
|
||||||
[source, java]
|
|
||||||
----
|
|
||||||
@SpringBootApplication
|
|
||||||
@EnableBinding(Sink.class)
|
|
||||||
public class Application {
|
|
||||||
|
|
||||||
public static void main(String[] args) {
|
|
||||||
SpringApplication.run(Application.class, args);
|
|
||||||
}
|
|
||||||
|
|
||||||
@StreamListener(Sink.INPUT)
|
|
||||||
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
|
|
||||||
System.out.println(in);
|
|
||||||
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Bean
|
|
||||||
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
|
|
||||||
return event -> {
|
|
||||||
System.out.println(event);
|
|
||||||
if (event.getConsumer().paused().size() > 0) {
|
|
||||||
event.getConsumer().resume(event.getConsumer().paused());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
----
|
|
||||||
|
|
||||||
[[kafka-transactional-binder]]
|
[[kafka-transactional-binder]]
|
||||||
=== Transactional Binder
|
=== Transactional Binder
|
||||||
@@ -993,4 +962,3 @@ public KafkaBinderHealth kafkaBinderHealthIndicator() {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user