From e7487b9ada3562ce276c4e32dfcaed9cdab8ed8f Mon Sep 17 00:00:00 2001 From: buildmaster Date: Tue, 7 Apr 2020 17:18:40 +0000 Subject: [PATCH] Update SNAPSHOT to 3.1.0.M1 --- README.adoc | 55 ++++++++++++++----- docs/pom.xml | 2 +- pom.xml | 8 +-- spring-cloud-starter-stream-kafka/pom.xml | 2 +- spring-cloud-stream-binder-kafka-core/pom.xml | 2 +- .../pom.xml | 2 +- spring-cloud-stream-binder-kafka/pom.xml | 2 +- 7 files changed, 50 insertions(+), 23 deletions(-) diff --git a/README.adoc b/README.adoc index 0792be38..2a85e40d 100644 --- a/README.adoc +++ b/README.adoc @@ -39,7 +39,7 @@ To use Apache Kafka binder, you need to add `spring-cloud-stream-binder-kafka` a ---- -Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown inn the following example for Maven: +Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown in the following example for Maven: [source,xml] ---- @@ -60,7 +60,7 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka The consumer group maps directly to the same Apache Kafka concept. Partitioning also maps directly to Apache Kafka partitions as well. -The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker of at least that version. +The binder currently uses the Apache Kafka `kafka-clients` version `2.3.1`. This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available. For example, with versions earlier than 0.11.x.x, native headers are not supported. Also, 0.11.x.x does not support the `autoAddPartitions` property. @@ -155,21 +155,15 @@ Default: See individual producer properties. spring.cloud.stream.kafka.binder.headerMapperBeanName:: The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to and from Kafka headers. -Use this, for example, if you wish to customize the trusted packages in a `DefaultKafkaHeaderMapper` that uses JSON deserialization for the headers. +Use this, for example, if you wish to customize the trusted packages in a `BinderHeaderMapper` bean that uses JSON deserialization for the headers. +If this custom `BinderHeaderMapper` bean is not made available to the binder using this property, then the binder will look for a header mapper bean with the name `kafkaBinderHeaderMapper` that is of type `BinderHeaderMapper` before falling back to a default `BinderHeaderMapper` created by the binder. + Default: none. -spring.cloud.stream.kafka.binder.authorizationExceptionRetryInterval:: -Enables retrying in case of authorization exceptions. -Defines interval between each retry. -Accepts `Duration`, e.g. `30s`, `2m`, etc. -+ -Default: `null` (retries disabled, fail fast) - [[kafka-consumer-properties]] ==== Kafka Consumer Properties -NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.=`. +NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.consumer.=`. The following properties are available for Kafka consumers only and @@ -234,9 +228,20 @@ The DLQ topic name can be configurable by setting the `dlqName` property. This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome. See <> processing for more information. Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message`, and `x-exception-stacktrace` as `byte[]`. +By default, a failed record is sent to the same partition number in the DLQ topic as the original record. +See <> for how to change that behavior. **Not allowed when `destinationIsPattern` is `true`.** + Default: `false`. +dlqPartitions:: +When `enableDlq` is true, and this property is not set, a dead letter topic with the same number of partitions as the primary topic(s) is created. +Usually, dead-letter records are sent to the same partition in the dead-letter topic as the original record. +This behavior can be changed; see <>. +If this property is set to `1` and there is no `DqlPartitionFunction` bean, all dead-letter records will be written to partition `0`. +If this property is greater than `1`, you **MUST** provide a `DlqPartitionFunction` bean. +Note that the actual partition count is affected by the binder's `minPartitionCount` property. ++ +Default: `none` configuration:: Map with a key/value pair containing generic Kafka consumer properties. In addition to having Kafka consumer properties, other configuration properties can be passed here. @@ -296,6 +301,12 @@ pollTimeout:: Timeout used for polling in pollable consumers. + Default: 5 seconds. +transactionManager:: +Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding. +Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`. +To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager. ++ +Default: none. ==== Consuming Batches @@ -311,7 +322,7 @@ Refer to the https://docs.spring.io/spring-kafka/docs/2.3.0.BUILD-SNAPSHOT/refer [[kafka-producer-properties]] ==== Kafka Producer Properties -NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.=`. +NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.producer.=`. The following properties are available for Kafka producers only and @@ -407,6 +418,12 @@ Supported values are `none`, `gzip`, `snappy` and `lz4`. If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings..producer.configuration.compression.type=zstd`. + Default: `none`. +transactionManager:: +Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding. +Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`. +To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager. ++ +Default: none. ==== Usage examples @@ -576,16 +593,24 @@ When used in a processor application, the consumer starts the transaction; any r When the listener exits normally, the listener container will send the offset to the transaction and commit it. A common producer factory is used for all producer bindings configured using `spring.cloud.stream.kafka.binder.transaction.producer.*` properties; individual binding Kafka producer properties are ignored. +IMPORTANT: Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too. +When retries are enabled (the common property `maxAttempts` is greater than zero) the retry properties are used to configure a `DefaultAfterRollbackProcessor` to enable retries at the container level. +Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the `DefaultAfterRollbackProcessor` which runs after the main transaction has rolled back. + If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. `@Scheduled` method), you must get a reference to the transactional producer factory and define a `KafkaTransactionManager` bean using it. ==== [source, java] ---- @Bean -public PlatformTransactionManager transactionManager(BinderFactory binders) { +public PlatformTransactionManager transactionManager(BinderFactory binders, + @Value("${unique.tx.id.per.instance}") String txId) { + ProducerFactory pf = ((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class)).getTransactionalProducerFactory(); - return new KafkaTransactionManager<>(pf); + KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); + tm.setTransactionId(txId) + return tm; } ---- ==== @@ -612,6 +637,8 @@ public static class Sender { If you wish to synchronize producer-only transactions with those from some other transaction manager, use a `ChainedTransactionManager`. +IMPORTANT: If you deploy multiple instances of your application, each instance needs a unique `transactionIdPrefix`. + [[kafka-error-channels]] === Error Channels diff --git a/docs/pom.xml b/docs/pom.xml index 8e2bf379..7b425b89 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -7,7 +7,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.1.0.BUILD-SNAPSHOT + 3.1.0.M1 pom spring-cloud-stream-binder-kafka-docs diff --git a/pom.xml b/pom.xml index 7200176f..2e88b9fe 100644 --- a/pom.xml +++ b/pom.xml @@ -2,12 +2,12 @@ 4.0.0 spring-cloud-stream-binder-kafka-parent - 3.1.0.BUILD-SNAPSHOT + 3.1.0.M1 pom org.springframework.cloud spring-cloud-build - 3.0.0.BUILD-SNAPSHOT + 3.0.0.M1 @@ -15,8 +15,8 @@ 2.4.4.RELEASE 3.2.1.RELEASE 2.4.0 - 1.1.0.BUILD-SNAPSHOT - 3.1.0.BUILD-SNAPSHOT + 1.1.0.M1 + 3.1.0.M1 true true true diff --git a/spring-cloud-starter-stream-kafka/pom.xml b/spring-cloud-starter-stream-kafka/pom.xml index 370fde3d..c1c8c46d 100644 --- a/spring-cloud-starter-stream-kafka/pom.xml +++ b/spring-cloud-starter-stream-kafka/pom.xml @@ -4,7 +4,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.1.0.BUILD-SNAPSHOT + 3.1.0.M1 spring-cloud-starter-stream-kafka Spring Cloud Starter Stream Kafka diff --git a/spring-cloud-stream-binder-kafka-core/pom.xml b/spring-cloud-stream-binder-kafka-core/pom.xml index 5de3807d..e353307f 100644 --- a/spring-cloud-stream-binder-kafka-core/pom.xml +++ b/spring-cloud-stream-binder-kafka-core/pom.xml @@ -5,7 +5,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.1.0.BUILD-SNAPSHOT + 3.1.0.M1 spring-cloud-stream-binder-kafka-core Spring Cloud Stream Kafka Binder Core diff --git a/spring-cloud-stream-binder-kafka-streams/pom.xml b/spring-cloud-stream-binder-kafka-streams/pom.xml index ef78dd66..9db4b6ab 100644 --- a/spring-cloud-stream-binder-kafka-streams/pom.xml +++ b/spring-cloud-stream-binder-kafka-streams/pom.xml @@ -10,7 +10,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.1.0.BUILD-SNAPSHOT + 3.1.0.M1 diff --git a/spring-cloud-stream-binder-kafka/pom.xml b/spring-cloud-stream-binder-kafka/pom.xml index ae8cbc9b..4289b281 100644 --- a/spring-cloud-stream-binder-kafka/pom.xml +++ b/spring-cloud-stream-binder-kafka/pom.xml @@ -10,7 +10,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.1.0.BUILD-SNAPSHOT + 3.1.0.M1