Update SNAPSHOT to 3.2.0-M2
This commit is contained in:
78
README.adoc
78
README.adoc
@@ -239,7 +239,7 @@ Note that this property is only applicable for pollable consumers.
|
||||
Default: not set.
|
||||
resetOffsets::
|
||||
Whether to reset offsets on the consumer to the value provided by startOffset.
|
||||
Must be false if a `KafkaRebalanceListener` is provided; see <<rebalance-listener>>.
|
||||
Must be false if a `KafkaBindingRebalanceListener` is provided; see <<rebalance-listener>>.
|
||||
See <<reset-offsets>> for more information about this property.
|
||||
+
|
||||
Default: `false`.
|
||||
@@ -337,6 +337,17 @@ Usually needed if you want to synchronize another transaction with the Kafka tra
|
||||
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
|
||||
+
|
||||
Default: none.
|
||||
txCommitRecovered::
|
||||
When using a transactional binder, the offset of a recovered record (e.g. when retries are exhausted and the record is sent to a dead letter topic) will be committed via a new transaction, by default.
|
||||
Setting this property to `false` suppresses committing the offset of recovered record.
|
||||
+
|
||||
Default: true.
|
||||
commonErrorHandlerBeanName::
|
||||
`CommonErrorHandler` bean name to use per consumer binding.
|
||||
When present, this user provided `CommonErrorHandler` takes precedence over any other error handlers defined by the binder.
|
||||
This is a handy way to express error handlers, if the application does not want to use a `ListenerContainerCustomizer` and then check the destination/group combination to set an error handler.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
[[reset-offsets]]
|
||||
==== Resetting Offsets
|
||||
@@ -363,7 +374,7 @@ Set `resetOffsets` to `true` and `startOffset` to `latest`; the binding will per
|
||||
|
||||
IMPORTANT: If a rebalance occurs after the initial assignment, the seeks will only be performed on any newly assigned partitions that were not assigned during the initial assignment.
|
||||
|
||||
For more control over topic offsets, see <<rebalance-listener>>; when a listener is provided, `resetOffsets: true` is ignored.
|
||||
For more control over topic offsets, see <<rebalance-listener>>; when a listener is provided, `resetOffsets` should not be set to `true`, otherwise, that will cause an error.
|
||||
|
||||
==== Consuming Batches
|
||||
|
||||
@@ -458,18 +469,18 @@ Default: none (the binder-wide default of -1 is used).
|
||||
useTopicHeader::
|
||||
Set to `true` to override the default binding destination (topic name) with the value of the `KafkaHeaders.TOPIC` message header in the outbound message.
|
||||
If the header is not present, the default binding destination is used.
|
||||
Default: `false`.
|
||||
+
|
||||
Default: `false`.
|
||||
recordMetadataChannel::
|
||||
The bean name of a `MessageChannel` to which successful send results should be sent; the bean must exist in the application context.
|
||||
The message sent to the channel is the sent message (after conversion, if any) with an additional header `KafkaHeaders.RECORD_METADATA`.
|
||||
The header contains a `RecordMetadata` object provided by the Kafka client; it includes the partition and offset where the record was written in the topic.
|
||||
|
||||
`ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)`
|
||||
|
||||
Failed sends go the producer error channel (if configured); see <<kafka-error-channels>>.
|
||||
Default: null
|
||||
+
|
||||
`ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)`
|
||||
+
|
||||
Failed sends go the producer error channel (if configured); see <<kafka-error-channels>>.
|
||||
+
|
||||
Default: null.
|
||||
|
||||
NOTE: The Kafka binder uses the `partitionCount` setting of the producer as a hint to create a topic with the given partition count (in conjunction with the `minPartitionCount`, the maximum of the two being the value being used).
|
||||
Exercise caution when configuring both `minPartitionCount` for a binder and `partitionCount` for an application, as the larger value is used.
|
||||
@@ -506,11 +517,11 @@ Default: `false`
|
||||
|
||||
In this section, we show the use of the preceding properties for specific scenarios.
|
||||
|
||||
===== Example: Setting `autoCommitOffset` to `false` and Relying on Manual Acking
|
||||
===== Example: Setting `ackMode` to `MANUAL` and Relying on Manual Acknowledgement
|
||||
|
||||
This example illustrates how one may manually acknowledge offsets in a consumer application.
|
||||
|
||||
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset` be set to `false`.
|
||||
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.ackMode` be set to `MANUAL`.
|
||||
Use the corresponding input channel name for your example.
|
||||
|
||||
[source]
|
||||
@@ -622,6 +633,47 @@ Usually, applications may use principals that do not have administrative rights
|
||||
Consequently, relying on Spring Cloud Stream to create/modify topics may fail.
|
||||
In secure environments, we strongly recommend creating topics and managing ACLs administratively by using Kafka tooling.
|
||||
|
||||
====== Multi-binder configuration and JAAS
|
||||
|
||||
When connecting to multiple clusters in which each one requires separate JAAS configuration, then set the JAAS configuration using the property `sasl.jaas.config`.
|
||||
When this property is present in the applicaiton, it takes precedence over the other strategies mentioned above.
|
||||
See this https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients[KIP-85] for more details.
|
||||
|
||||
For example, if you have two clusters in your application with separate JAAS configuration, then the following is a template that you can use:
|
||||
|
||||
```
|
||||
spring.cloud.stream:
|
||||
binders:
|
||||
kafka1:
|
||||
type: kafka
|
||||
environment:
|
||||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
kafka:
|
||||
binder:
|
||||
brokers: localhost:9092
|
||||
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
|
||||
kafka2:
|
||||
type: kafka
|
||||
environment:
|
||||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
kafka:
|
||||
binder:
|
||||
brokers: localhost:9093
|
||||
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
|
||||
kafka.binder:
|
||||
configuration:
|
||||
security.protocol: SASL_PLAINTEXT
|
||||
sasl.mechanism: PLAIN
|
||||
```
|
||||
|
||||
Note that both the Kafka clusters, and the `sasl.jaas.config` values for each of them are different in the above configuration.
|
||||
|
||||
See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples/kafka-multi-binder-jaas[sample application] for more details on how to setup and run such an application.
|
||||
|
||||
[[pause-resume]]
|
||||
===== Example: Pausing and Resuming the Consumer
|
||||
|
||||
@@ -774,10 +826,10 @@ public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
|
||||
====
|
||||
|
||||
[[rebalance-listener]]
|
||||
=== Using a KafkaRebalanceListener
|
||||
=== Using a KafkaBindingRebalanceListener
|
||||
|
||||
Applications may wish to seek topics/partitions to arbitrary offsets when the partitions are initially assigned, or perform other operations on the consumer.
|
||||
Starting with version 2.1, if you provide a single `KafkaRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.
|
||||
Starting with version 2.1, if you provide a single `KafkaBindingRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.
|
||||
|
||||
====
|
||||
[source, java]
|
||||
@@ -830,7 +882,7 @@ You cannot set the `resetOffsets` consumer property to `true` when you provide a
|
||||
If you want advanced customization of consumer and producer configuration that is used for creating `ConsumerFactory` and `ProducerFactory` in Kafka,
|
||||
you can implement the following customizers.
|
||||
|
||||
* ConsusumerConfigCustomizer
|
||||
* ConsumerConfigCustomizer
|
||||
* ProducerConfigCustomizer
|
||||
|
||||
Both of these interfaces provide a way to configure the config map used for consumer and producer properties.
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<version>3.2.0-M2</version>
|
||||
</parent>
|
||||
<packaging>jar</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
|spring.cloud.stream.dynamic-destinations | `[]` | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound.
|
||||
|spring.cloud.stream.function.batch-mode | `false` |
|
||||
|spring.cloud.stream.function.bindings | |
|
||||
|spring.cloud.stream.function.definition | | Definition of functions to bind. If several functions need to be composed into one, use pipes (e.g., 'fooFunc\|barFunc')
|
||||
|spring.cloud.stream.instance-count | `1` | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index | `0` | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index-list | | A list of instance id's from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is the name of the binding. This setting will override the one set in 'spring.cloud.stream.instance-index'
|
||||
|
||||
8
pom.xml
8
pom.xml
@@ -2,12 +2,12 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<version>3.2.0-M2</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>3.1.0-SNAPSHOT</version>
|
||||
<version>3.1.0-M2</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<scm>
|
||||
@@ -24,8 +24,8 @@
|
||||
<spring-kafka.version>2.8.0-M3</spring-kafka.version>
|
||||
<spring-integration-kafka.version>5.5.2</spring-integration-kafka.version>
|
||||
<kafka.version>2.8.0</kafka.version>
|
||||
<spring-cloud-schema-registry.version>1.2.0-SNAPSHOT</spring-cloud-schema-registry.version>
|
||||
<spring-cloud-stream.version>3.2.0-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-schema-registry.version>1.2.0-M2</spring-cloud-schema-registry.version>
|
||||
<spring-cloud-stream.version>3.2.0-M2</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<version>3.2.0-M2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<version>3.2.0-M2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<version>3.2.0-M2</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<version>3.2.0-M2</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
Reference in New Issue
Block a user