|
|
|
|
@@ -1,13 +1,13 @@
|
|
|
|
|
[partintro]
|
|
|
|
|
--
|
|
|
|
|
This guide describes the Apache Kafka implementation of the Spring Cloud Stream Binder.
|
|
|
|
|
It contains information about its design, usage and configuration options, as well as information on how the Stream Cloud Stream concepts map into Apache Kafka specific constructs.
|
|
|
|
|
In addition, this guide also explains the Kafka Streams binding capabilities of Spring Cloud Stream.
|
|
|
|
|
It contains information about its design, usage, and configuration options, as well as information on how the Stream Cloud Stream concepts map onto Apache Kafka specific constructs.
|
|
|
|
|
In addition, this guide explains the Kafka Streams binding capabilities of Spring Cloud Stream.
|
|
|
|
|
--
|
|
|
|
|
|
|
|
|
|
== Usage
|
|
|
|
|
|
|
|
|
|
To use Apache Kafka binder all you need is to add `spring-cloud-stream-binder-kafka` as a dependency to your Spring Cloud Stream application. Below is a Maven example:
|
|
|
|
|
To use Apache Kafka binder, you need to add `spring-cloud-stream-binder-kafka` as a dependency to your Spring Cloud Stream application, as shown in the following example for Maven:
|
|
|
|
|
|
|
|
|
|
[source,xml]
|
|
|
|
|
----
|
|
|
|
|
@@ -17,7 +17,7 @@ To use Apache Kafka binder all you need is to add `spring-cloud-stream-binder-ka
|
|
|
|
|
</dependency>
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
Alternatively, you can also use the Spring Cloud Stream Kafka Starter.
|
|
|
|
|
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown inn the following example for Maven:
|
|
|
|
|
|
|
|
|
|
[source,xml]
|
|
|
|
|
----
|
|
|
|
|
@@ -29,17 +29,17 @@ Alternatively, you can also use the Spring Cloud Stream Kafka Starter.
|
|
|
|
|
|
|
|
|
|
== Apache Kafka Binder Overview
|
|
|
|
|
|
|
|
|
|
A simplified diagram of how the Apache Kafka binder operates can be seen below.
|
|
|
|
|
The following image shows a simplified diagram of how the Apache Kafka binder operates:
|
|
|
|
|
|
|
|
|
|
.Kafka Binder
|
|
|
|
|
image::kafka-binder.png[width=300,scaledwidth="50%"]
|
|
|
|
|
image::images/kafka-binder.png[width=300,scaledwidth="50%"]
|
|
|
|
|
|
|
|
|
|
The Apache Kafka Binder implementation maps each destination to an Apache Kafka topic.
|
|
|
|
|
The consumer group maps directly to the same Apache Kafka concept.
|
|
|
|
|
Partitioning also maps directly to Apache Kafka partitions as well.
|
|
|
|
|
|
|
|
|
|
The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker at least that version.
|
|
|
|
|
This client can communicate with older brokers (refer to the Kafka documentation), but certain features may not be available.
|
|
|
|
|
The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker of at least that version.
|
|
|
|
|
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
|
|
|
|
|
For example, with versions earlier than 0.11.x.x, native headers are not supported.
|
|
|
|
|
Also, 0.11.x.x does not support the `autoAddPartitions` property.
|
|
|
|
|
|
|
|
|
|
@@ -47,77 +47,79 @@ Also, 0.11.x.x does not support the `autoAddPartitions` property.
|
|
|
|
|
|
|
|
|
|
This section contains the configuration options used by the Apache Kafka binder.
|
|
|
|
|
|
|
|
|
|
For common configuration options and properties pertaining to binder, refer to the <<binding-properties,core documentation>>.
|
|
|
|
|
For common configuration options and properties pertaining to binder, see the <<binding-properties,core documentation>>.
|
|
|
|
|
|
|
|
|
|
=== Kafka Binder Properties
|
|
|
|
|
|
|
|
|
|
spring.cloud.stream.kafka.binder.brokers::
|
|
|
|
|
A list of brokers to which the Kafka binder will connect.
|
|
|
|
|
A list of brokers to which the Kafka binder connects.
|
|
|
|
|
+
|
|
|
|
|
Default: `localhost`.
|
|
|
|
|
spring.cloud.stream.kafka.binder.defaultBrokerPort::
|
|
|
|
|
`brokers` allows hosts specified with or without port information (e.g., `host1,host2:port2`).
|
|
|
|
|
`brokers` allows hosts specified with or without port information (for example, `host1,host2:port2`).
|
|
|
|
|
This sets the default port when no port is configured in the broker list.
|
|
|
|
|
+
|
|
|
|
|
Default: `9092`.
|
|
|
|
|
spring.cloud.stream.kafka.binder.configuration::
|
|
|
|
|
Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder.
|
|
|
|
|
Due to the fact that these properties will be used by both producers and consumers, usage should be restricted to common properties, for example, security settings.
|
|
|
|
|
Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder.
|
|
|
|
|
Due to the fact that these properties are used by both producers and consumers, usage should be restricted to common properties -- for example, security settings.
|
|
|
|
|
+
|
|
|
|
|
Default: Empty map.
|
|
|
|
|
spring.cloud.stream.kafka.binder.headers::
|
|
|
|
|
The list of custom headers that will be transported by the binder.
|
|
|
|
|
Only required when communicating with older applications (<= 1.3.x) with a `kafka-clients` version < 0.11.0.0; newer versions support headers natively.
|
|
|
|
|
The list of custom headers that are transported by the binder.
|
|
|
|
|
Only required when communicating with older applications (<= 1.3.x) with a `kafka-clients` version < 0.11.0.0. Newer versions support headers natively.
|
|
|
|
|
+
|
|
|
|
|
Default: empty.
|
|
|
|
|
spring.cloud.stream.kafka.binder.healthTimeout::
|
|
|
|
|
The time to wait to get partition information in seconds; default 60.
|
|
|
|
|
Health will report as down if this timer expires.
|
|
|
|
|
The time to wait to get partition information, in seconds.
|
|
|
|
|
Health reports as down if this timer expires.
|
|
|
|
|
+
|
|
|
|
|
Default: 10.
|
|
|
|
|
spring.cloud.stream.kafka.binder.requiredAcks::
|
|
|
|
|
The number of required acks on the broker.
|
|
|
|
|
The number of required acks on the broker.
|
|
|
|
|
See the Kafka documentation for the producer `acks` property.
|
|
|
|
|
+
|
|
|
|
|
Default: `1`.
|
|
|
|
|
spring.cloud.stream.kafka.binder.minPartitionCount::
|
|
|
|
|
Effective only if `autoCreateTopics` or `autoAddPartitions` is set.
|
|
|
|
|
The global minimum number of partitions that the binder will configure on topics on which it produces/consumes data.
|
|
|
|
|
It can be superseded by the `partitionCount` setting of the producer or by the value of `instanceCount` * `concurrency` settings of the producer (if either is larger).
|
|
|
|
|
Effective only if `autoCreateTopics` or `autoAddPartitions` is set.
|
|
|
|
|
The global minimum number of partitions that the binder configures on topics on which it produces or consumes data.
|
|
|
|
|
It can be superseded by the `partitionCount` setting of the producer or by the value of `instanceCount * concurrency` settings of the producer (if either is larger).
|
|
|
|
|
+
|
|
|
|
|
Default: `1`.
|
|
|
|
|
spring.cloud.stream.kafka.binder.replicationFactor::
|
|
|
|
|
The replication factor of auto-created topics if `autoCreateTopics` is active.
|
|
|
|
|
Can be overriden on each binding.
|
|
|
|
|
The replication factor of auto-created topics if `autoCreateTopics` is active.
|
|
|
|
|
Can be overridden on each binding.
|
|
|
|
|
+
|
|
|
|
|
Default: `1`.
|
|
|
|
|
spring.cloud.stream.kafka.binder.autoCreateTopics::
|
|
|
|
|
If set to `true`, the binder will create new topics automatically.
|
|
|
|
|
If set to `false`, the binder will rely on the topics being already configured.
|
|
|
|
|
In the latter case, if the topics do not exist, the binder will fail to start.
|
|
|
|
|
Of note, this setting is independent of the `auto.topic.create.enable` setting of the broker and it does not influence it: if the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
|
|
|
|
|
If set to `true`, the binder creates new topics automatically.
|
|
|
|
|
If set to `false`, the binder relies on the topics being already configured.
|
|
|
|
|
In the latter case, if the topics do not exist, the binder fails to start.
|
|
|
|
|
+
|
|
|
|
|
NOTE: This setting is independent of the `auto.topic.create.enable` setting of the broker and does not influence it.
|
|
|
|
|
If the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
|
|
|
|
|
+
|
|
|
|
|
Default: `true`.
|
|
|
|
|
spring.cloud.stream.kafka.binder.autoAddPartitions::
|
|
|
|
|
If set to `true`, the binder will create add new partitions if required.
|
|
|
|
|
If set to `false`, the binder will rely on the partition size of the topic being already configured.
|
|
|
|
|
If the partition count of the target topic is smaller than the expected value, the binder will fail to start.
|
|
|
|
|
If set to `true`, the binder creates new partitions if required.
|
|
|
|
|
If set to `false`, the binder relies on the partition size of the topic being already configured.
|
|
|
|
|
If the partition count of the target topic is smaller than the expected value, the binder fails to start.
|
|
|
|
|
+
|
|
|
|
|
Default: `false`.
|
|
|
|
|
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix::
|
|
|
|
|
Enable transactions in the binder; see `transaction.id` in the Kafka documentation and https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions[Transactions] in the `spring-kafka` documentation.
|
|
|
|
|
When transactions are enabled, individual `producer` properties are ignored and all producers use the `spring.cloud.stream.kafka.binder.transaction.producer.*` properties.
|
|
|
|
|
Enables transactions in the binder. See `transaction.id` in the Kafka documentation and https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions[Transactions] in the `spring-kafka` documentation.
|
|
|
|
|
When transactions are enabled, individual `producer` properties are ignored and all producers use the `spring.cloud.stream.kafka.binder.transaction.producer.*` properties.
|
|
|
|
|
+
|
|
|
|
|
Default `null` (no transactions)
|
|
|
|
|
spring.cloud.stream.kafka.binder.transaction.producer.*::
|
|
|
|
|
Global producer properties for producers in a transactional binder.
|
|
|
|
|
See `spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix` and <<kafka-producer-properties>> and the general producer properties supported by all binders.
|
|
|
|
|
Global producer properties for producers in a transactional binder.
|
|
|
|
|
See `spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix` and <<kafka-producer-properties>> and the general producer properties supported by all binders.
|
|
|
|
|
+
|
|
|
|
|
Default: See individual producer properties.
|
|
|
|
|
|
|
|
|
|
spring.cloud.stream.kafka.binder.headerMapperBeanName::
|
|
|
|
|
The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to/from Kafka headers.
|
|
|
|
|
Use this, for example, if you wish to customize the trusted packages in a `DefaultKafkaHeaderMapper`, which uses JSON deserialization for the headers.
|
|
|
|
|
The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to and from Kafka headers.
|
|
|
|
|
Use this, for example, if you wish to customize the trusted packages in a `DefaultKafkaHeaderMapper` that uses JSON deserialization for the headers.
|
|
|
|
|
+
|
|
|
|
|
Default: none.
|
|
|
|
|
|
|
|
|
|
@@ -128,53 +130,52 @@ The following properties are available for Kafka consumers only and
|
|
|
|
|
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
|
|
|
|
|
|
|
|
|
|
admin.configuration::
|
|
|
|
|
A `Map` of Kafka topic properties used when provisioning topics.
|
|
|
|
|
e.g. `spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0`
|
|
|
|
|
A `Map` of Kafka topic properties used when provisioning topics -- for example, `spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0`
|
|
|
|
|
+
|
|
|
|
|
Default: none.
|
|
|
|
|
|
|
|
|
|
admin.replicas-assignment::
|
|
|
|
|
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and value the assignments.
|
|
|
|
|
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and the value being the assignments.
|
|
|
|
|
Used when provisioning new topics.
|
|
|
|
|
See `NewTopic` javadocs in the `kafka-clients` jar.
|
|
|
|
|
See the `NewTopic` Javadocs in the `kafka-clients` jar.
|
|
|
|
|
+
|
|
|
|
|
Default: none.
|
|
|
|
|
|
|
|
|
|
admin.replication-factor::
|
|
|
|
|
The replication factor to use when provisioning topics; overrides the binder-wide setting.
|
|
|
|
|
The replication factor to use when provisioning topics. Overrides the binder-wide setting.
|
|
|
|
|
Ignored if `replicas-assignments` is present.
|
|
|
|
|
+
|
|
|
|
|
Default: none (the binder-wide default of 1 is used).
|
|
|
|
|
|
|
|
|
|
autoRebalanceEnabled::
|
|
|
|
|
When `true`, topic partitions will be automatically rebalanced between the members of a consumer group.
|
|
|
|
|
When `false`, each consumer will be assigned a fixed set of partitions based on `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex`.
|
|
|
|
|
This requires both `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex` properties to be set appropriately on each launched instance.
|
|
|
|
|
The property `spring.cloud.stream.instanceCount` must typically be greater than 1 in this case.
|
|
|
|
|
When `true`, topic partitions is automatically rebalanced between the members of a consumer group.
|
|
|
|
|
When `false`, each consumer is assigned a fixed set of partitions based on `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex`.
|
|
|
|
|
This requires both the `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex` properties to be set appropriately on each launched instance.
|
|
|
|
|
The value of the `spring.cloud.stream.instanceCount` property must typically be greater than 1 in this case.
|
|
|
|
|
+
|
|
|
|
|
Default: `true`.
|
|
|
|
|
ackEachRecord::
|
|
|
|
|
When `autoCommitOffset` is `true`, whether to commit the offset after each record is processed.
|
|
|
|
|
When `autoCommitOffset` is `true`, this setting dictates whether to commit the offset after each record is processed.
|
|
|
|
|
By default, offsets are committed after all records in the batch of records returned by `consumer.poll()` have been processed.
|
|
|
|
|
The number of records returned by a poll can be controlled with the `max.poll.recods` Kafka property, set via the consumer `configuration` property.
|
|
|
|
|
Setting this to true may cause a degradation in performance, but reduces the likelihood of redelivered records when a failure occurs.
|
|
|
|
|
Also see the binder `requiredAcks` property, which also affects the performance of committing offsets.
|
|
|
|
|
The number of records returned by a poll can be controlled with the `max.poll.records` Kafka property, which is set through the consumer `configuration` property.
|
|
|
|
|
Setting this to `true` may cause a degradation in performance, but doing so reduces the likelihood of redelivered records when a failure occurs.
|
|
|
|
|
Also, see the binder `requiredAcks` property, which also affects the performance of committing offsets.
|
|
|
|
|
+
|
|
|
|
|
Default: `false`.
|
|
|
|
|
autoCommitOffset::
|
|
|
|
|
Whether to autocommit offsets when a message has been processed.
|
|
|
|
|
If set to `false`, a header with the key `kafka_acknowledgment` of the type `org.springframework.kafka.support.Acknowledgment` header will be present in the inbound message.
|
|
|
|
|
Whether to autocommit offsets when a message has been processed.
|
|
|
|
|
If set to `false`, a header with the key `kafka_acknowledgment` of the type `org.springframework.kafka.support.Acknowledgment` header is present in the inbound message.
|
|
|
|
|
Applications may use this header for acknowledging messages.
|
|
|
|
|
See the examples section for details.
|
|
|
|
|
When this property is set to `false`, Kafka binder will set the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL` and the application is responsible for acknowledging records.
|
|
|
|
|
When this property is set to `false`, Kafka binder sets the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL` and the application is responsible for acknowledging records.
|
|
|
|
|
Also see `ackEachRecord`.
|
|
|
|
|
+
|
|
|
|
|
Default: `true`.
|
|
|
|
|
autoCommitOnError::
|
|
|
|
|
Effective only if `autoCommitOffset` is set to `true`.
|
|
|
|
|
If set to `false` it suppresses auto-commits for messages that result in errors, and will commit only for successful messages, allows a stream to automatically replay from the last successfully processed message, in case of persistent failures.
|
|
|
|
|
If set to `true`, it will always auto-commit (if auto-commit is enabled).
|
|
|
|
|
If not set (default), it effectively has the same value as `enableDlq`, auto-committing erroneous messages if they are sent to a DLQ, and not committing them otherwise.
|
|
|
|
|
Effective only if `autoCommitOffset` is set to `true`.
|
|
|
|
|
If set to `false`, it suppresses auto-commits for messages that result in errors and commits only for successful messages. It allows a stream to automatically replay from the last successfully processed message, in case of persistent failures.
|
|
|
|
|
If set to `true`, it always auto-commits (if auto-commit is enabled).
|
|
|
|
|
If not set (the default), it effectively has the same value as `enableDlq`, auto-committing erroneous messages if they are sent to a DLQ and not committing them otherwise.
|
|
|
|
|
+
|
|
|
|
|
Default: not set.
|
|
|
|
|
resetOffsets::
|
|
|
|
|
@@ -182,48 +183,48 @@ Whether to reset offsets on the consumer to the value provided by startOffset.
|
|
|
|
|
+
|
|
|
|
|
Default: `false`.
|
|
|
|
|
startOffset::
|
|
|
|
|
The starting offset for new groups.
|
|
|
|
|
Allowed values: `earliest`, `latest`.
|
|
|
|
|
If the consumer group is set explicitly for the consumer 'binding' (via `spring.cloud.stream.bindings.<channelName>.group`), then 'startOffset' is set to `earliest`; otherwise it is set to `latest` for the `anonymous` consumer group.
|
|
|
|
|
Also see `resetOffsets`.
|
|
|
|
|
The starting offset for new groups.
|
|
|
|
|
Allowed values: `earliest` and `latest`.
|
|
|
|
|
If the consumer group is set explicitly for the consumer 'binding' (through `spring.cloud.stream.bindings.<channelName>.group`), 'startOffset' is set to `earliest`. Otherwise, it is set to `latest` for the `anonymous` consumer group.
|
|
|
|
|
Also see `resetOffsets` (earlier in this list).
|
|
|
|
|
+
|
|
|
|
|
Default: null (equivalent to `earliest`).
|
|
|
|
|
enableDlq::
|
|
|
|
|
When set to true, it will send enable DLQ behavior for the consumer.
|
|
|
|
|
By default, messages that result in errors will be forwarded to a topic named `error.<destination>.<group>`.
|
|
|
|
|
The DLQ topic name can be configurable via the property `dlqName`.
|
|
|
|
|
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
|
|
|
|
|
See <<kafka-dlq-processing>> processing for more information.
|
|
|
|
|
Starting with _version 2.0_, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message` and `x-exception-stacktrace` as `byte[]`.
|
|
|
|
|
When set to true, it enables DLQ behavior for the consumer.
|
|
|
|
|
By default, messages that result in errors are forwarded to a topic named `error.<destination>.<group>`.
|
|
|
|
|
The DLQ topic name can be configurable by setting the `dlqName` property.
|
|
|
|
|
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
|
|
|
|
|
See <<kafka-dlq-processing>> processing for more information.
|
|
|
|
|
Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message`, and `x-exception-stacktrace` as `byte[]`.
|
|
|
|
|
+
|
|
|
|
|
Default: `false`.
|
|
|
|
|
configuration::
|
|
|
|
|
Map with a key/value pair containing generic Kafka consumer properties.
|
|
|
|
|
Map with a key/value pair containing generic Kafka consumer properties.
|
|
|
|
|
+
|
|
|
|
|
Default: Empty map.
|
|
|
|
|
dlqName::
|
|
|
|
|
The name of the DLQ topic to receive the error messages.
|
|
|
|
|
The name of the DLQ topic to receive the error messages.
|
|
|
|
|
+
|
|
|
|
|
Default: null (If not specified, messages that result in errors will be forwarded to a topic named `error.<destination>.<group>`).
|
|
|
|
|
Default: null (If not specified, messages that result in errors are forwarded to a topic named `error.<destination>.<group>`).
|
|
|
|
|
dlqProducerProperties::
|
|
|
|
|
Using this, dlq specific producer properties can be set.
|
|
|
|
|
All the properties available through kafka producer properties can be set through this property.
|
|
|
|
|
Using this, DLQ-specific producer properties can be set.
|
|
|
|
|
All the properties available through kafka producer properties can be set through this property.
|
|
|
|
|
+
|
|
|
|
|
Default: Default Kafka producer properties.
|
|
|
|
|
standardHeaders::
|
|
|
|
|
Indicates which standard headers are populated by the inbound channel adapter.
|
|
|
|
|
`none`, `id`, `timestamp` or `both`.
|
|
|
|
|
Useful if using native deserialization and the first component to receive a message needs an `id` (such as an aggregator that is configured to use a JDBC message store).
|
|
|
|
|
Indicates which standard headers are populated by the inbound channel adapter.
|
|
|
|
|
Allowed values: `none`, `id`, `timestamp`, or `both`.
|
|
|
|
|
Useful if using native deserialization and the first component to receive a message needs an `id` (such as an aggregator that is configured to use a JDBC message store).
|
|
|
|
|
+
|
|
|
|
|
Default: `none`
|
|
|
|
|
converterBeanName::
|
|
|
|
|
The name of a bean that implements `RecordMessageConverter`; used in the inbound channel adapter to replace the default `MessagingMessageConverter`.
|
|
|
|
|
The name of a bean that implements `RecordMessageConverter`. Used in the inbound channel adapter to replace the default `MessagingMessageConverter`.
|
|
|
|
|
+
|
|
|
|
|
Default: `null`
|
|
|
|
|
idleEventInterval::
|
|
|
|
|
The interval, in milliseconds between events indicating that no messages have recently been received.
|
|
|
|
|
Use an `ApplicationListener<ListenerContainerIdleEvent>` to receive these events.
|
|
|
|
|
See <<pause-resume>> for a usage example.
|
|
|
|
|
The interval, in milliseconds, between events indicating that no messages have recently been received.
|
|
|
|
|
Use an `ApplicationListener<ListenerContainerIdleEvent>` to receive these events.
|
|
|
|
|
See <<pause-resume>> for a usage example.
|
|
|
|
|
+
|
|
|
|
|
Default: `30000`
|
|
|
|
|
|
|
|
|
|
@@ -234,73 +235,70 @@ The following properties are available for Kafka producers only and
|
|
|
|
|
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.
|
|
|
|
|
|
|
|
|
|
admin.configuration::
|
|
|
|
|
A `Map` of Kafka topic properties used when provisioning new topics.
|
|
|
|
|
e.g. `spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0`
|
|
|
|
|
A `Map` of Kafka topic properties used when provisioning new topics -- for example, `spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0`
|
|
|
|
|
+
|
|
|
|
|
Default: none.
|
|
|
|
|
|
|
|
|
|
admin.replicas-assignment::
|
|
|
|
|
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and value the assignments.
|
|
|
|
|
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and the value being the assignments.
|
|
|
|
|
Used when provisioning new topics.
|
|
|
|
|
See `NewTopic` javadocs in the `kafka-clients` jar.
|
|
|
|
|
+
|
|
|
|
|
Default: none.
|
|
|
|
|
|
|
|
|
|
admin.replication-factor::
|
|
|
|
|
The replication factor to use when provisioning new topics; overrides the binder-wide setting.
|
|
|
|
|
The replication factor to use when provisioning new topics. Overrides the binder-wide setting.
|
|
|
|
|
Ignored if `replicas-assignments` is present.
|
|
|
|
|
+
|
|
|
|
|
Default: none (the binder-wide default of 1 is used).
|
|
|
|
|
|
|
|
|
|
bufferSize::
|
|
|
|
|
Upper limit, in bytes, of how much data the Kafka producer will attempt to batch before sending.
|
|
|
|
|
Upper limit, in bytes, of how much data the Kafka producer attempts to batch before sending.
|
|
|
|
|
+
|
|
|
|
|
Default: `16384`.
|
|
|
|
|
sync::
|
|
|
|
|
Whether the producer is synchronous.
|
|
|
|
|
Whether the producer is synchronous.
|
|
|
|
|
+
|
|
|
|
|
Default: `false`.
|
|
|
|
|
batchTimeout::
|
|
|
|
|
How long the producer will wait before sending in order to allow more messages to accumulate in the same batch.
|
|
|
|
|
(Normally the producer does not wait at all, and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.
|
|
|
|
|
How long the producer waits to allow more messages to accumulate in the same batch before sending the messages.
|
|
|
|
|
(Normally, the producer does not wait at all and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.
|
|
|
|
|
+
|
|
|
|
|
Default: `0`.
|
|
|
|
|
messageKeyExpression::
|
|
|
|
|
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message.
|
|
|
|
|
For example `headers['myKey']`; the payload cannot be used because by the time this expression is evaluated, the payload is already in the form of a `byte[]`.
|
|
|
|
|
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message -- for example, `headers['myKey']`.
|
|
|
|
|
The payload cannot be used because, by the time this expression is evaluated, the payload is already in the form of a `byte[]`.
|
|
|
|
|
+
|
|
|
|
|
Default: `none`.
|
|
|
|
|
headerPatterns::
|
|
|
|
|
A comma-delimited list of simple patterns to match spring-messaging headers to be mapped to the kafka `Headers` in the `ProducerRecord`.
|
|
|
|
|
A comma-delimited list of simple patterns to match Spring messaging headers to be mapped to the Kafka `Headers` in the `ProducerRecord`.
|
|
|
|
|
Patterns can begin or end with the wildcard character (asterisk).
|
|
|
|
|
Patterns can be negated by prefixing with `!`; matching stops after the first match (positive or negative).
|
|
|
|
|
For example `!foo,fo*` will pass `fox` but not `foo`.
|
|
|
|
|
Patterns can be negated by prefixing with `!`.
|
|
|
|
|
Matching stops after the first match (positive or negative).
|
|
|
|
|
For example `!ask,as*` will pass `ash` but not `ask`.
|
|
|
|
|
`id` and `timestamp` are never mapped.
|
|
|
|
|
+
|
|
|
|
|
Default: `*` (all headers - except the `id` and `timestamp`)
|
|
|
|
|
configuration::
|
|
|
|
|
Map with a key/value pair containing generic Kafka producer properties.
|
|
|
|
|
Map with a key/value pair containing generic Kafka producer properties.
|
|
|
|
|
+
|
|
|
|
|
Default: Empty map.
|
|
|
|
|
|
|
|
|
|
[NOTE]
|
|
|
|
|
====
|
|
|
|
|
The Kafka binder will use the `partitionCount` setting of the producer as a hint to create a topic with the given partition count (in conjunction with the `minPartitionCount`, the maximum of the two being the value being used).
|
|
|
|
|
Exercise caution when configuring both `minPartitionCount` for a binder and `partitionCount` for an application, as the larger value will be used.
|
|
|
|
|
If a topic already exists with a smaller partition count and `autoAddPartitions` is disabled (the default), then the binder will fail to start.
|
|
|
|
|
If a topic already exists with a smaller partition count and `autoAddPartitions` is enabled, new partitions will be added.
|
|
|
|
|
If a topic already exists with a larger number of partitions than the maximum of (`minPartitionCount` and `partitionCount`), the existing partition count will be used.
|
|
|
|
|
====
|
|
|
|
|
NOTE: The Kafka binder uses the `partitionCount` setting of the producer as a hint to create a topic with the given partition count (in conjunction with the `minPartitionCount`, the maximum of the two being the value being used).
|
|
|
|
|
Exercise caution when configuring both `minPartitionCount` for a binder and `partitionCount` for an application, as the larger value is used.
|
|
|
|
|
If a topic already exists with a smaller partition count and `autoAddPartitions` is disabled (the default), the binder fails to start.
|
|
|
|
|
If a topic already exists with a smaller partition count and `autoAddPartitions` is enabled, new partitions are added.
|
|
|
|
|
If a topic already exists with a larger number of partitions than the maximum of (`minPartitionCount` or `partitionCount`), the existing partition count is used.
|
|
|
|
|
|
|
|
|
|
=== Usage examples
|
|
|
|
|
|
|
|
|
|
In this section, we illustrate the use of the above properties for specific scenarios.
|
|
|
|
|
In this section, we show the use of the preceding properties for specific scenarios.
|
|
|
|
|
|
|
|
|
|
==== Example: Setting `autoCommitOffset` false and relying on manual acking.
|
|
|
|
|
==== Example: Setting `autoCommitOffset` to `false` and Relying on Manual Acking
|
|
|
|
|
|
|
|
|
|
This example illustrates how one may manually acknowledge offsets in a consumer application.
|
|
|
|
|
|
|
|
|
|
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset` is set to false.
|
|
|
|
|
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset` be set to `false`.
|
|
|
|
|
Use the corresponding input channel name for your example.
|
|
|
|
|
|
|
|
|
|
[source]
|
|
|
|
|
@@ -324,13 +322,13 @@ public class ManuallyAcknowdledgingConsumer {
|
|
|
|
|
}
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
==== Example: security configuration
|
|
|
|
|
==== Example: Security Configuration
|
|
|
|
|
|
|
|
|
|
Apache Kafka 0.9 supports secure connections between client and brokers.
|
|
|
|
|
To take advantage of this feature, follow the guidelines in the http://kafka.apache.org/090/documentation.html#security_configclients[Apache Kafka Documentation] as well as the Kafka 0.9 http://docs.confluent.io/2.0.0/kafka/security.html[security guidelines from the Confluent documentation].
|
|
|
|
|
Use the `spring.cloud.stream.kafka.binder.configuration` option to set security properties for all clients created by the binder.
|
|
|
|
|
|
|
|
|
|
For example, for setting `security.protocol` to `SASL_SSL`, set:
|
|
|
|
|
For example, to set `security.protocol` to `SASL_SSL`, set the following property:
|
|
|
|
|
|
|
|
|
|
[source]
|
|
|
|
|
----
|
|
|
|
|
@@ -341,14 +339,14 @@ All the other security properties can be set in a similar manner.
|
|
|
|
|
|
|
|
|
|
When using Kerberos, follow the instructions in the http://kafka.apache.org/090/documentation.html#security_sasl_clientconfig[reference documentation] for creating and referencing the JAAS configuration.
|
|
|
|
|
|
|
|
|
|
Spring Cloud Stream supports passing JAAS configuration information to the application using a JAAS configuration file and using Spring Boot properties.
|
|
|
|
|
Spring Cloud Stream supports passing JAAS configuration information to the application by using a JAAS configuration file and using Spring Boot properties.
|
|
|
|
|
|
|
|
|
|
===== Using JAAS configuration files
|
|
|
|
|
===== Using JAAS Configuration Files
|
|
|
|
|
|
|
|
|
|
The JAAS, and (optionally) krb5 file locations can be set for Spring Cloud Stream applications by using system properties.
|
|
|
|
|
Here is an example of launching a Spring Cloud Stream application with SASL and Kerberos using a JAAS configuration file:
|
|
|
|
|
The JAAS and (optionally) krb5 file locations can be set for Spring Cloud Stream applications by using system properties.
|
|
|
|
|
The following example shows how to launch a Spring Cloud Stream application with SASL and Kerberos by using a JAAS configuration file:
|
|
|
|
|
|
|
|
|
|
[source]
|
|
|
|
|
[source,bash]
|
|
|
|
|
----
|
|
|
|
|
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
|
|
|
|
|
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
|
|
|
|
|
@@ -356,28 +354,28 @@ Here is an example of launching a Spring Cloud Stream application with SASL and
|
|
|
|
|
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
===== Using Spring Boot properties
|
|
|
|
|
===== Using Spring Boot Properties
|
|
|
|
|
|
|
|
|
|
As an alternative to having a JAAS configuration file, Spring Cloud Stream provides a mechanism for setting up the JAAS configuration for Spring Cloud Stream applications using Spring Boot properties.
|
|
|
|
|
As an alternative to having a JAAS configuration file, Spring Cloud Stream provides a mechanism for setting up the JAAS configuration for Spring Cloud Stream applications by using Spring Boot properties.
|
|
|
|
|
|
|
|
|
|
The following properties can be used for configuring the login context of the Kafka client.
|
|
|
|
|
The following properties can be used to configure the login context of the Kafka client:
|
|
|
|
|
|
|
|
|
|
spring.cloud.stream.kafka.binder.jaas.loginModule::
|
|
|
|
|
The login module name. Not necessary to be set in normal cases.
|
|
|
|
|
The login module name. Not necessary to be set in normal cases.
|
|
|
|
|
+
|
|
|
|
|
Default: `com.sun.security.auth.module.Krb5LoginModule`.
|
|
|
|
|
spring.cloud.stream.kafka.binder.jaas.controlFlag::
|
|
|
|
|
The control flag of the login module.
|
|
|
|
|
The control flag of the login module.
|
|
|
|
|
+
|
|
|
|
|
Default: `required`.
|
|
|
|
|
spring.cloud.stream.kafka.binder.jaas.options::
|
|
|
|
|
Map with a key/value pair containing the login module options.
|
|
|
|
|
Map with a key/value pair containing the login module options.
|
|
|
|
|
+
|
|
|
|
|
Default: Empty map.
|
|
|
|
|
|
|
|
|
|
Here is an example of launching a Spring Cloud Stream application with SASL and Kerberos using Spring Boot configuration properties:
|
|
|
|
|
The following example shows how to launch a Spring Cloud Stream application with SASL and Kerberos by using Spring Boot configuration properties:
|
|
|
|
|
|
|
|
|
|
[source]
|
|
|
|
|
[source,bash]
|
|
|
|
|
----
|
|
|
|
|
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
|
|
|
|
|
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
|
|
|
|
|
@@ -389,7 +387,7 @@ Here is an example of launching a Spring Cloud Stream application with SASL and
|
|
|
|
|
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
This represents the equivalent of the following JAAS file:
|
|
|
|
|
The preceding example represents the equivalent of the following JAAS file:
|
|
|
|
|
|
|
|
|
|
[source]
|
|
|
|
|
----
|
|
|
|
|
@@ -402,31 +400,26 @@ KafkaClient {
|
|
|
|
|
};
|
|
|
|
|
----
|
|
|
|
|
|
|
|
|
|
If the topics required already exist on the broker, or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent.
|
|
|
|
|
If the topics required already exist on the broker or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent.
|
|
|
|
|
|
|
|
|
|
[NOTE]
|
|
|
|
|
====
|
|
|
|
|
Do not mix JAAS configuration files and Spring Boot properties in the same application.
|
|
|
|
|
If the `-Djava.security.auth.login.config` system property is already present, Spring Cloud Stream will ignore the Spring Boot properties.
|
|
|
|
|
NOTE: Do not mix JAAS configuration files and Spring Boot properties in the same application.
|
|
|
|
|
If the `-Djava.security.auth.login.config` system property is already present, Spring Cloud Stream ignores the Spring Boot properties.
|
|
|
|
|
|
|
|
|
|
====
|
|
|
|
|
|
|
|
|
|
[NOTE]
|
|
|
|
|
====
|
|
|
|
|
Exercise caution when using the `autoCreateTopics` and `autoAddPartitions` if using Kerberos.
|
|
|
|
|
Usually applications may use principals that do not have administrative rights in Kafka and Zookeeper, and relying on Spring Cloud Stream to create/modify topics may fail.
|
|
|
|
|
In secure environments, we strongly recommend creating topics and managing ACLs administratively using Kafka tooling.
|
|
|
|
|
====
|
|
|
|
|
NOTE: Be careful when using the `autoCreateTopics` and `autoAddPartitions` with Kerberos.
|
|
|
|
|
Usually, applications may use principals that do not have administrative rights in Kafka and Zookeeper.
|
|
|
|
|
Consequently, relying on Spring Cloud Stream to create/modify topics may fail.
|
|
|
|
|
In secure environments, we strongly recommend creating topics and managing ACLs administratively by using Kafka tooling.
|
|
|
|
|
|
|
|
|
|
[[pause-resume]]
|
|
|
|
|
==== Example: Pausing and Resuming the Consumer
|
|
|
|
|
|
|
|
|
|
If you wish to suspend consumption, but not cause a partition rebalance, you can pause and resume the consumer.
|
|
|
|
|
If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
|
|
|
|
|
This is facilitated by adding the `Consumer` as a parameter to your `@StreamListener`.
|
|
|
|
|
To resume, you need an `ApplicationListener` for `ListenerContainerIdleEvent` s; the frequency at which events are published is controlled by the `idleEventInterval` property.
|
|
|
|
|
To resume, you need an `ApplicationListener` for `ListenerContainerIdleEvent` instances.
|
|
|
|
|
The frequency at which events are published is controlled by the `idleEventInterval` property.
|
|
|
|
|
Since the consumer is not thread-safe, you must call these methods on the calling thread.
|
|
|
|
|
|
|
|
|
|
The following simple application shows how to pause and resume.
|
|
|
|
|
The following simple application shows how to pause and resume:
|
|
|
|
|
|
|
|
|
|
[source, java]
|
|
|
|
|
----
|
|
|
|
|
@@ -460,21 +453,22 @@ public class Application {
|
|
|
|
|
[[kafka-error-channels]]
|
|
|
|
|
== Error Channels
|
|
|
|
|
|
|
|
|
|
Starting with _version 1.3_, the binder unconditionally sends exceptions to an error channel for each consumer destination, and can be configured to send async producer send failures to an error channel too.
|
|
|
|
|
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
|
|
|
|
|
See <<binder-error-channels>> for more information.
|
|
|
|
|
|
|
|
|
|
The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureException` with properties:
|
|
|
|
|
|
|
|
|
|
* `failedMessage` - the spring-messaging `Message<?>` that failed to be sent.
|
|
|
|
|
* `record` - the raw `ProducerRecord` that was created from the `failedMessage`
|
|
|
|
|
* `failedMessage`: The Spring Messaging `Message<?>` that failed to be sent.
|
|
|
|
|
* `record`: The raw `ProducerRecord` that was created from the `failedMessage`
|
|
|
|
|
|
|
|
|
|
There is no automatic handling of producer exceptions (such as sending to a <<kafka-dlq-processing, Dead-Letter queue>>); you can consume these exceptions with your own Spring Integration flow.
|
|
|
|
|
There is no automatic handling of producer exceptions (such as sending to a <<kafka-dlq-processing, Dead-Letter queue>>).
|
|
|
|
|
You can consume these exceptions with your own Spring Integration flow.
|
|
|
|
|
|
|
|
|
|
[[kafka-metrics]]
|
|
|
|
|
== Kafka Metrics
|
|
|
|
|
|
|
|
|
|
Kafka binder module exposes the following metrics:
|
|
|
|
|
|
|
|
|
|
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag` - this metric indicates how many messages have not been yet consumed from given binder's topic by given consumer group.
|
|
|
|
|
For example if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, then consumer group `myGroup` has `1000` messages to waiting to be consumed from topic `myTopic`.
|
|
|
|
|
This metric is particularly useful to provide auto-scaling feedback to PaaS platform of your choice.
|
|
|
|
|
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
|
|
|
|
|
For example, if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, the consumer group named `myGroup` has `1000` messages waiting to be consumed from the topic calle `myTopic`.
|
|
|
|
|
This metric is particularly useful for providing auto-scaling feedback to a PaaS platform.
|
|
|
|
|
|