|
|
|
|
@@ -4,6 +4,7 @@ Manual changes to this file will be lost when it is generated again.
|
|
|
|
|
Edit the files in the src/main/asciidoc/ directory instead.
|
|
|
|
|
////
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
:jdkversion: 1.8
|
|
|
|
|
:github-tag: master
|
|
|
|
|
:github-repo: spring-cloud/spring-cloud-stream-binder-kafka
|
|
|
|
|
@@ -39,7 +40,7 @@ To use Apache Kafka binder, you need to add `spring-cloud-stream-binder-kafka` a
|
|
|
|
|
</dependency>
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown inn the following example for Maven:
|
|
|
|
|
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown in the following example for Maven:
|
|
|
|
|
|
|
|
|
|
[source,xml]
|
|
|
|
|
----
|
|
|
|
|
@@ -60,7 +61,7 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka
|
|
|
|
|
The consumer group maps directly to the same Apache Kafka concept.
|
|
|
|
|
Partitioning also maps directly to Apache Kafka partitions as well.
|
|
|
|
|
|
|
|
|
|
The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker of at least that version.
|
|
|
|
|
The binder currently uses the Apache Kafka `kafka-clients` version `2.3.1`.
|
|
|
|
|
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
|
|
|
|
|
For example, with versions earlier than 0.11.x.x, native headers are not supported.
|
|
|
|
|
Also, 0.11.x.x does not support the `autoAddPartitions` property.
|
|
|
|
|
@@ -69,7 +70,7 @@ Also, 0.11.x.x does not support the `autoAddPartitions` property.
|
|
|
|
|
|
|
|
|
|
This section contains the configuration options used by the Apache Kafka binder.
|
|
|
|
|
|
|
|
|
|
For common configuration options and properties pertaining to binder, see the <<binding-properties,core documentation>>.
|
|
|
|
|
For common configuration options and properties pertaining to the binder, see the https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#binding-properties[binding properties] in core documentation.
|
|
|
|
|
|
|
|
|
|
==== Kafka Binder Properties
|
|
|
|
|
|
|
|
|
|
@@ -126,7 +127,11 @@ spring.cloud.stream.kafka.binder.replicationFactor::
|
|
|
|
|
The replication factor of auto-created topics if `autoCreateTopics` is active.
|
|
|
|
|
Can be overridden on each binding.
|
|
|
|
|
+
|
|
|
|
|
Default: `1`.
|
|
|
|
|
NOTE: If you are using Kafka broker versions prior to 2.4, then this value should be set to at least `1`.
|
|
|
|
|
Starting with version 3.0.8, the binder uses `-1` as the default value, which indicates that the broker 'default.replication.factor' property will be used to determine the number of replicas.
|
|
|
|
|
Check with your Kafka broker admins to see if there is a policy in place that requires a minimum replication factor, if that's the case then, typically, the `default.replication.factor` will match that value and `-1` should be used, unless you need a replication factor greater than the minimum.
|
|
|
|
|
+
|
|
|
|
|
Default: `-1`.
|
|
|
|
|
spring.cloud.stream.kafka.binder.autoCreateTopics::
|
|
|
|
|
If set to `true`, the binder creates new topics automatically.
|
|
|
|
|
If set to `false`, the binder relies on the topics being already configured.
|
|
|
|
|
@@ -142,11 +147,6 @@ If set to `false`, the binder relies on the partition size of the topic being al
|
|
|
|
|
If the partition count of the target topic is smaller than the expected value, the binder fails to start.
|
|
|
|
|
+
|
|
|
|
|
Default: `false`.
|
|
|
|
|
spring.cloud.stream.kafka.binder.autoAlterTopics::
|
|
|
|
|
If set to `true`, the binder alters destination topic configs if required.
|
|
|
|
|
If set to `false`, the binder relies on existing configs of the topic.
|
|
|
|
|
+
|
|
|
|
|
Default: `false`.
|
|
|
|
|
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix::
|
|
|
|
|
Enables transactions in the binder. See `transaction.id` in the Kafka documentation and https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions[Transactions] in the `spring-kafka` documentation.
|
|
|
|
|
When transactions are enabled, individual `producer` properties are ignored and all producers use the `spring.cloud.stream.kafka.binder.transaction.producer.*` properties.
|
|
|
|
|
@@ -160,21 +160,28 @@ Default: See individual producer properties.
|
|
|
|
|
|
|
|
|
|
spring.cloud.stream.kafka.binder.headerMapperBeanName::
|
|
|
|
|
The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to and from Kafka headers.
|
|
|
|
|
Use this, for example, if you wish to customize the trusted packages in a `DefaultKafkaHeaderMapper` that uses JSON deserialization for the headers.
|
|
|
|
|
Use this, for example, if you wish to customize the trusted packages in a `BinderHeaderMapper` bean that uses JSON deserialization for the headers.
|
|
|
|
|
If this custom `BinderHeaderMapper` bean is not made available to the binder using this property, then the binder will look for a header mapper bean with the name `kafkaBinderHeaderMapper` that is of type `BinderHeaderMapper` before falling back to a default `BinderHeaderMapper` created by the binder.
|
|
|
|
|
+
|
|
|
|
|
Default: none.
|
|
|
|
|
|
|
|
|
|
spring.cloud.stream.kafka.binder.authorizationExceptionRetryInterval::
|
|
|
|
|
Enables retrying in case of authorization exceptions.
|
|
|
|
|
Defines interval between each retry.
|
|
|
|
|
Accepts `Duration`, e.g. `30s`, `2m`, etc.
|
|
|
|
|
spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader::
|
|
|
|
|
Flag to set the binder health as `down`, when any partitions on the topic, regardless of the consumer that is receiving data from it, is found without a leader.
|
|
|
|
|
+
|
|
|
|
|
Default: `null` (retries disabled, fail fast)
|
|
|
|
|
Default: `false`.
|
|
|
|
|
|
|
|
|
|
spring.cloud.stream.kafka.binder.certificateStoreDirectory::
|
|
|
|
|
When the truststore or keystore certificate location is given as a classpath URL (`classpath:...`), the binder copies the resource from the classpath location inside the JAR file to a location on the filesystem.
|
|
|
|
|
The file will be moved to the location specified as the value for this property which must be an existing directory on the filesystem that is writable by the process running the application.
|
|
|
|
|
If this value is not set and the certificate file is a classpath resource, then it will be moved to System's temp directory as returned by `System.getProperty("java.io.tmpdir")`.
|
|
|
|
|
This is also true, if this value is present, but the directory cannot be found on the filesystem or is not writable.
|
|
|
|
|
+
|
|
|
|
|
Default: none.
|
|
|
|
|
|
|
|
|
|
[[kafka-consumer-properties]]
|
|
|
|
|
==== Kafka Consumer Properties
|
|
|
|
|
|
|
|
|
|
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
|
|
|
|
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.consumer.<property>=<value>`.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
The following properties are available for Kafka consumers only and
|
|
|
|
|
@@ -235,13 +242,24 @@ Default: null (equivalent to `earliest`).
|
|
|
|
|
enableDlq::
|
|
|
|
|
When set to true, it enables DLQ behavior for the consumer.
|
|
|
|
|
By default, messages that result in errors are forwarded to a topic named `error.<destination>.<group>`.
|
|
|
|
|
The DLQ topic name can be configurable by setting the `dlqName` property.
|
|
|
|
|
The DLQ topic name can be configurable by setting the `dlqName` property or by defining a `@Bean` of type `DlqDestinationResolver`.
|
|
|
|
|
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
|
|
|
|
|
See <<kafka-dlq-processing>> processing for more information.
|
|
|
|
|
Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message`, and `x-exception-stacktrace` as `byte[]`.
|
|
|
|
|
By default, a failed record is sent to the same partition number in the DLQ topic as the original record.
|
|
|
|
|
See <<dlq-partition-selection>> for how to change that behavior.
|
|
|
|
|
**Not allowed when `destinationIsPattern` is `true`.**
|
|
|
|
|
+
|
|
|
|
|
Default: `false`.
|
|
|
|
|
dlqPartitions::
|
|
|
|
|
When `enableDlq` is true, and this property is not set, a dead letter topic with the same number of partitions as the primary topic(s) is created.
|
|
|
|
|
Usually, dead-letter records are sent to the same partition in the dead-letter topic as the original record.
|
|
|
|
|
This behavior can be changed; see <<dlq-partition-selection>>.
|
|
|
|
|
If this property is set to `1` and there is no `DqlPartitionFunction` bean, all dead-letter records will be written to partition `0`.
|
|
|
|
|
If this property is greater than `1`, you **MUST** provide a `DlqPartitionFunction` bean.
|
|
|
|
|
Note that the actual partition count is affected by the binder's `minPartitionCount` property.
|
|
|
|
|
+
|
|
|
|
|
Default: `none`
|
|
|
|
|
configuration::
|
|
|
|
|
Map with a key/value pair containing generic Kafka consumer properties.
|
|
|
|
|
In addition to having Kafka consumer properties, other configuration properties can be passed here.
|
|
|
|
|
@@ -296,7 +314,7 @@ topic.replication-factor::
|
|
|
|
|
The replication factor to use when provisioning topics. Overrides the binder-wide setting.
|
|
|
|
|
Ignored if `replicas-assignments` is present.
|
|
|
|
|
+
|
|
|
|
|
Default: none (the binder-wide default of 1 is used).
|
|
|
|
|
Default: none (the binder-wide default of -1 is used).
|
|
|
|
|
pollTimeout::
|
|
|
|
|
Timeout used for polling in pollable consumers.
|
|
|
|
|
+
|
|
|
|
|
@@ -304,10 +322,12 @@ Default: 5 seconds.
|
|
|
|
|
|
|
|
|
|
==== Consuming Batches
|
|
|
|
|
|
|
|
|
|
Starting with version 3.0, when `spring.cloud.stream.bindings.<name>.consumer.batch-mode` is set to `true`, all of the records received by polling the Kafka `Consumer` will be presented as a `List<?>` to the listener method.
|
|
|
|
|
Starting with version 3.0, when `spring.cloud.stream.binding.<name>.consumer.batch-mode` is set to `true`, all of the records received by polling the Kafka `Consumer` will be presented as a `List<?>` to the listener method.
|
|
|
|
|
Otherwise, the method will be called with one record at a time.
|
|
|
|
|
The size of the batch is controlled by Kafka consumer properties `max.poll.records`, `fetch.min.bytes`, `fetch.max.wait.ms`; refer to the Kafka documentation for more information.
|
|
|
|
|
|
|
|
|
|
Bear in mind that batch mode is not supported with `@StreamListener` - it only works with the newer functional programming model.
|
|
|
|
|
|
|
|
|
|
IMPORTANT: Retry within the binder is not supported when using batch mode, so `maxAttempts` will be overridden to 1.
|
|
|
|
|
You can configure a `SeekToCurrentBatchErrorHandler` (using a `ListenerContainerCustomizer`) to achieve similar functionality to retry in the binder.
|
|
|
|
|
You can also use a manual `AckMode` and call `Ackowledgment.nack(index, sleep)` to commit the offsets for a partial batch and have the remaining records redelivered.
|
|
|
|
|
@@ -316,7 +336,7 @@ Refer to the https://docs.spring.io/spring-kafka/docs/2.3.0.BUILD-SNAPSHOT/refer
|
|
|
|
|
[[kafka-producer-properties]]
|
|
|
|
|
==== Kafka Producer Properties
|
|
|
|
|
|
|
|
|
|
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
|
|
|
|
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.producer.<property>=<value>`.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
The following properties are available for Kafka producers only and
|
|
|
|
|
@@ -355,6 +375,11 @@ messageKeyExpression::
|
|
|
|
|
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message -- for example, `headers['myKey']`.
|
|
|
|
|
With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of a `byte[]`.
|
|
|
|
|
Now, the expression is evaluated before the payload is converted.
|
|
|
|
|
In the case of a regular processor (`Function<String, String>` or `Function<Message<?>, Message<?>`), if the produced key needs to be same as the incoming key from the topic, this property can be set as below.
|
|
|
|
|
`spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']`
|
|
|
|
|
There is an important caveat to keep in mind for reactive functions.
|
|
|
|
|
In that case, it is up to the application to manually copy the headers from the incoming messages to outbound messages.
|
|
|
|
|
You can set the header, e.g. `myKey` and use `headers['myKey']` as suggested above or, for convenience, simply set the `KafkaHeaders.MESSAGE_KEY` header, and you do not need to set this property at all.
|
|
|
|
|
+
|
|
|
|
|
Default: `none`.
|
|
|
|
|
headerPatterns::
|
|
|
|
|
@@ -383,7 +408,7 @@ topic.replication-factor::
|
|
|
|
|
The replication factor to use when provisioning topics. Overrides the binder-wide setting.
|
|
|
|
|
Ignored if `replicas-assignments` is present.
|
|
|
|
|
+
|
|
|
|
|
Default: none (the binder-wide default of 1 is used).
|
|
|
|
|
Default: none (the binder-wide default of -1 is used).
|
|
|
|
|
useTopicHeader::
|
|
|
|
|
Set to `true` to override the default binding destination (topic name) with the value of the `KafkaHeaders.TOPIC` message header in the outbound message.
|
|
|
|
|
If the header is not present, the default binding destination is used.
|
|
|
|
|
@@ -406,13 +431,18 @@ If a topic already exists with a smaller partition count and `autoAddPartitions`
|
|
|
|
|
If a topic already exists with a smaller partition count and `autoAddPartitions` is enabled, new partitions are added.
|
|
|
|
|
If a topic already exists with a larger number of partitions than the maximum of (`minPartitionCount` or `partitionCount`), the existing partition count is used.
|
|
|
|
|
|
|
|
|
|
compressionType::
|
|
|
|
|
compression::
|
|
|
|
|
Set the `compression.type` producer property.
|
|
|
|
|
Supported values are `none`, `gzip`, `snappy` and `lz4`.
|
|
|
|
|
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
|
|
|
|
|
+
|
|
|
|
|
Default: `none`.
|
|
|
|
|
|
|
|
|
|
closeTimeout::
|
|
|
|
|
Timeout in number of seconds to wait for when closing the producer.
|
|
|
|
|
+
|
|
|
|
|
Default: `30`
|
|
|
|
|
|
|
|
|
|
==== Usage examples
|
|
|
|
|
|
|
|
|
|
In this section, we show the use of the preceding properties for specific scenarios.
|
|
|
|
|
@@ -550,25 +580,25 @@ The following simple application shows how to pause and resume:
|
|
|
|
|
@EnableBinding(Sink.class)
|
|
|
|
|
public class Application {
|
|
|
|
|
|
|
|
|
|
public static void main(String[] args) {
|
|
|
|
|
SpringApplication.run(Application.class, args);
|
|
|
|
|
}
|
|
|
|
|
public static void main(String[] args) {
|
|
|
|
|
SpringApplication.run(Application.class, args);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@StreamListener(Sink.INPUT)
|
|
|
|
|
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
|
|
|
|
|
System.out.println(in);
|
|
|
|
|
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
|
|
|
|
|
}
|
|
|
|
|
@StreamListener(Sink.INPUT)
|
|
|
|
|
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
|
|
|
|
|
System.out.println(in);
|
|
|
|
|
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
|
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
|
|
|
|
|
return event -> {
|
|
|
|
|
System.out.println(event);
|
|
|
|
|
if (event.getConsumer().paused().size() > 0) {
|
|
|
|
|
event.getConsumer().resume(event.getConsumer().paused());
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
@Bean
|
|
|
|
|
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
|
|
|
|
|
return event -> {
|
|
|
|
|
System.out.println(event);
|
|
|
|
|
if (event.getConsumer().paused().size() > 0) {
|
|
|
|
|
event.getConsumer().resume(event.getConsumer().paused());
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
----
|
|
|
|
|
@@ -581,6 +611,10 @@ When used in a processor application, the consumer starts the transaction; any r
|
|
|
|
|
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
|
|
|
|
|
A common producer factory is used for all producer bindings configured using `spring.cloud.stream.kafka.binder.transaction.producer.*` properties; individual binding Kafka producer properties are ignored.
|
|
|
|
|
|
|
|
|
|
IMPORTANT: Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too.
|
|
|
|
|
When retries are enabled (the common property `maxAttempts` is greater than zero) the retry properties are used to configure a `DefaultAfterRollbackProcessor` to enable retries at the container level.
|
|
|
|
|
Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the `DefaultAfterRollbackProcessor` which runs after the main transaction has rolled back.
|
|
|
|
|
|
|
|
|
|
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. `@Scheduled` method), you must get a reference to the transactional producer factory and define a `KafkaTransactionManager` bean using it.
|
|
|
|
|
|
|
|
|
|
====
|
|
|
|
|
@@ -621,7 +655,7 @@ If you wish to synchronize producer-only transactions with those from some other
|
|
|
|
|
=== Error Channels
|
|
|
|
|
|
|
|
|
|
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
|
|
|
|
|
See <<spring-cloud-stream-overview-error-handling>> for more information.
|
|
|
|
|
See https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling[this section on error handling] for more information.
|
|
|
|
|
|
|
|
|
|
The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureException` with properties:
|
|
|
|
|
|
|
|
|
|
@@ -637,9 +671,25 @@ You can consume these exceptions with your own Spring Integration flow.
|
|
|
|
|
Kafka binder module exposes the following metrics:
|
|
|
|
|
|
|
|
|
|
`spring.cloud.stream.binder.kafka.offset`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
|
|
|
|
|
The metrics provided are based on the Mircometer metrics library. The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
|
|
|
|
|
The metrics provided are based on the Micrometer library.
|
|
|
|
|
The binder creates the `KafkaBinderMetrics` bean if Micrometer is on the classpath and no other such beans provided by the application.
|
|
|
|
|
The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
|
|
|
|
|
This metric is particularly useful for providing auto-scaling feedback to a PaaS platform.
|
|
|
|
|
|
|
|
|
|
You can exclude `KafkaBinderMetrics` from creating the necessary infrastructure like consumers and then reporting the metrics by providing the following component in the application.
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
@Component
|
|
|
|
|
class NoOpBindingMeters {
|
|
|
|
|
NoOpBindingMeters(MeterRegistry registry) {
|
|
|
|
|
registry.config().meterFilter(
|
|
|
|
|
MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
More details on how to suppress meters selectively can be found https://micrometer.io/docs/concepts#_meter_filters[here].
|
|
|
|
|
|
|
|
|
|
[[kafka-tombstones]]
|
|
|
|
|
=== Tombstone Records (null record values)
|
|
|
|
|
|
|
|
|
|
@@ -669,39 +719,39 @@ Starting with version 2.1, if you provide a single `KafkaRebalanceListener` bean
|
|
|
|
|
----
|
|
|
|
|
public interface KafkaBindingRebalanceListener {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Invoked by the container before any pending offsets are committed.
|
|
|
|
|
* @param bindingName the name of the binding.
|
|
|
|
|
* @param consumer the consumer.
|
|
|
|
|
* @param partitions the partitions.
|
|
|
|
|
*/
|
|
|
|
|
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
|
|
|
|
|
Collection<TopicPartition> partitions) {
|
|
|
|
|
/**
|
|
|
|
|
* Invoked by the container before any pending offsets are committed.
|
|
|
|
|
* @param bindingName the name of the binding.
|
|
|
|
|
* @param consumer the consumer.
|
|
|
|
|
* @param partitions the partitions.
|
|
|
|
|
*/
|
|
|
|
|
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
|
|
|
|
|
Collection<TopicPartition> partitions) {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Invoked by the container after any pending offsets are committed.
|
|
|
|
|
* @param bindingName the name of the binding.
|
|
|
|
|
* @param consumer the consumer.
|
|
|
|
|
* @param partitions the partitions.
|
|
|
|
|
*/
|
|
|
|
|
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
|
|
|
|
|
/**
|
|
|
|
|
* Invoked by the container after any pending offsets are committed.
|
|
|
|
|
* @param bindingName the name of the binding.
|
|
|
|
|
* @param consumer the consumer.
|
|
|
|
|
* @param partitions the partitions.
|
|
|
|
|
*/
|
|
|
|
|
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Invoked when partitions are initially assigned or after a rebalance.
|
|
|
|
|
* Applications might only want to perform seek operations on an initial assignment.
|
|
|
|
|
* @param bindingName the name of the binding.
|
|
|
|
|
* @param consumer the consumer.
|
|
|
|
|
* @param partitions the partitions.
|
|
|
|
|
* @param initial true if this is the initial assignment.
|
|
|
|
|
*/
|
|
|
|
|
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
|
|
|
|
|
boolean initial) {
|
|
|
|
|
/**
|
|
|
|
|
* Invoked when partitions are initially assigned or after a rebalance.
|
|
|
|
|
* Applications might only want to perform seek operations on an initial assignment.
|
|
|
|
|
* @param bindingName the name of the binding.
|
|
|
|
|
* @param consumer the consumer.
|
|
|
|
|
* @param partitions the partitions.
|
|
|
|
|
* @param initial true if this is the initial assignment.
|
|
|
|
|
*/
|
|
|
|
|
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
|
|
|
|
|
boolean initial) {
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
----
|
|
|
|
|
@@ -709,6 +759,36 @@ public interface KafkaBindingRebalanceListener {
|
|
|
|
|
|
|
|
|
|
You cannot set the `resetOffsets` consumer property to `true` when you provide a rebalance listener.
|
|
|
|
|
|
|
|
|
|
[[consumer-producer-config-customizer]]
|
|
|
|
|
=== Customizing Consumer and Producer configuration
|
|
|
|
|
|
|
|
|
|
If you want advanced customization of consumer and producer configuration that is used for creating `ConsumerFactory` and `ProducerFactory` in Kafka,
|
|
|
|
|
you can implement the following customizers.
|
|
|
|
|
|
|
|
|
|
* ConsusumerConfigCustomizer
|
|
|
|
|
* ProducerConfigCustomizer
|
|
|
|
|
|
|
|
|
|
Both of these interfaces provide a way to configure the config map used for consumer and producer properties.
|
|
|
|
|
For example, if you want to gain access to a bean that is defined at the application level, you can inject that in the implementation of the `configure` method.
|
|
|
|
|
When the binder discovers that these customizers are available as beans, it will invoke the `configure` method right before creating the consumer and producer factories.
|
|
|
|
|
|
|
|
|
|
[[admin-client-config-customization]]
|
|
|
|
|
=== Customizing AdminClient Configuration
|
|
|
|
|
|
|
|
|
|
As with consumer and producer config customization above, applications can also customize the configuration for admin clients by providing an `AdminClientConfigCustomizer`.
|
|
|
|
|
AdminClientConfigCustomizer's configure method provides access to the admin client properties, using which you can define further customization.
|
|
|
|
|
Binder's Kafka topic provisioner gives the highest precedence for the properties given through this customizer.
|
|
|
|
|
Here is an example of providing this customizer bean.
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
@Bean
|
|
|
|
|
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
|
|
|
|
|
return props -> {
|
|
|
|
|
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
= Appendices
|
|
|
|
|
[appendix]
|
|
|
|
|
[[building]]
|
|
|
|
|
|