From af0485e24146ed5f390896c264b9db80b3b5dab1 Mon Sep 17 00:00:00 2001 From: buildmaster Date: Fri, 1 Oct 2021 14:13:49 +0000 Subject: [PATCH] Going back to snapshots --- README.adoc | 74 +++---------------- docs/pom.xml | 2 +- docs/src/main/asciidoc/_configprops.adoc | 1 + pom.xml | 8 +- spring-cloud-starter-stream-kafka/pom.xml | 2 +- spring-cloud-stream-binder-kafka-core/pom.xml | 2 +- .../pom.xml | 2 +- spring-cloud-stream-binder-kafka/pom.xml | 2 +- 8 files changed, 21 insertions(+), 72 deletions(-) diff --git a/README.adoc b/README.adoc index ae379590..4f9b410d 100644 --- a/README.adoc +++ b/README.adoc @@ -239,7 +239,7 @@ Note that this property is only applicable for pollable consumers. Default: not set. resetOffsets:: Whether to reset offsets on the consumer to the value provided by startOffset. -Must be false if a `KafkaBindingRebalanceListener` is provided; see <>. +Must be false if a `KafkaRebalanceListener` is provided; see <>. See <> for more information about this property. + Default: `false`. @@ -337,17 +337,6 @@ Usually needed if you want to synchronize another transaction with the Kafka tra To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager. + Default: none. -txCommitRecovered:: -When using a transactional binder, the offset of a recovered record (e.g. when retries are exhausted and the record is sent to a dead letter topic) will be committed via a new transaction, by default. -Setting this property to `false` suppresses committing the offset of recovered record. -+ -Default: true. -commonErrorHandlerBeanName:: -`CommonErrorHandler` bean name to use per consumer binding. -When present, this user provided `CommonErrorHandler` takes precedence over any other error handlers defined by the binder. -This is a handy way to express error handlers, if the application does not want to use a `ListenerContainerCustomizer` and then check the destination/group combination to set an error handler. -+ -Default: none. [[reset-offsets]] ==== Resetting Offsets @@ -374,7 +363,7 @@ Set `resetOffsets` to `true` and `startOffset` to `latest`; the binding will per IMPORTANT: If a rebalance occurs after the initial assignment, the seeks will only be performed on any newly assigned partitions that were not assigned during the initial assignment. -For more control over topic offsets, see <>; when a listener is provided, `resetOffsets` should not be set to `true`, otherwise, that will cause an error. +For more control over topic offsets, see <>; when a listener is provided, `resetOffsets: true` is ignored. ==== Consuming Batches @@ -469,18 +458,18 @@ Default: none (the binder-wide default of -1 is used). useTopicHeader:: Set to `true` to override the default binding destination (topic name) with the value of the `KafkaHeaders.TOPIC` message header in the outbound message. If the header is not present, the default binding destination is used. -+ Default: `false`. ++ recordMetadataChannel:: The bean name of a `MessageChannel` to which successful send results should be sent; the bean must exist in the application context. The message sent to the channel is the sent message (after conversion, if any) with an additional header `KafkaHeaders.RECORD_METADATA`. The header contains a `RecordMetadata` object provided by the Kafka client; it includes the partition and offset where the record was written in the topic. -+ + `ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)` -+ + Failed sends go the producer error channel (if configured); see <>. +Default: null + -Default: null. NOTE: The Kafka binder uses the `partitionCount` setting of the producer as a hint to create a topic with the given partition count (in conjunction with the `minPartitionCount`, the maximum of the two being the value being used). Exercise caution when configuring both `minPartitionCount` for a binder and `partitionCount` for an application, as the larger value is used. @@ -517,11 +506,11 @@ Default: `false` In this section, we show the use of the preceding properties for specific scenarios. -===== Example: Setting `ackMode` to `MANUAL` and Relying on Manual Acknowledgement +===== Example: Setting `autoCommitOffset` to `false` and Relying on Manual Acking This example illustrates how one may manually acknowledge offsets in a consumer application. -This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.ackMode` be set to `MANUAL`. +This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset` be set to `false`. Use the corresponding input channel name for your example. [source] @@ -633,47 +622,6 @@ Usually, applications may use principals that do not have administrative rights Consequently, relying on Spring Cloud Stream to create/modify topics may fail. In secure environments, we strongly recommend creating topics and managing ACLs administratively by using Kafka tooling. -====== Multi-binder configuration and JAAS - -When connecting to multiple clusters in which each one requires separate JAAS configuration, then set the JAAS configuration using the property `sasl.jaas.config`. -When this property is present in the applicaiton, it takes precedence over the other strategies mentioned above. -See this https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients[KIP-85] for more details. - -For example, if you have two clusters in your application with separate JAAS configuration, then the following is a template that you can use: - -``` -spring.cloud.stream: - binders: - kafka1: - type: kafka - environment: - spring: - cloud: - stream: - kafka: - binder: - brokers: localhost:9092 - configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";" - kafka2: - type: kafka - environment: - spring: - cloud: - stream: - kafka: - binder: - brokers: localhost:9093 - configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";" - kafka.binder: - configuration: - security.protocol: SASL_PLAINTEXT - sasl.mechanism: PLAIN -``` - -Note that both the Kafka clusters, and the `sasl.jaas.config` values for each of them are different in the above configuration. - -See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples/kafka-multi-binder-jaas[sample application] for more details on how to setup and run such an application. - [[pause-resume]] ===== Example: Pausing and Resuming the Consumer @@ -826,10 +774,10 @@ public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key, ==== [[rebalance-listener]] -=== Using a KafkaBindingRebalanceListener +=== Using a KafkaRebalanceListener Applications may wish to seek topics/partitions to arbitrary offsets when the partitions are initially assigned, or perform other operations on the consumer. -Starting with version 2.1, if you provide a single `KafkaBindingRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings. +Starting with version 2.1, if you provide a single `KafkaRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings. ==== [source, java] @@ -882,7 +830,7 @@ You cannot set the `resetOffsets` consumer property to `true` when you provide a If you want advanced customization of consumer and producer configuration that is used for creating `ConsumerFactory` and `ProducerFactory` in Kafka, you can implement the following customizers. -* ConsumerConfigCustomizer +* ConsusumerConfigCustomizer * ProducerConfigCustomizer Both of these interfaces provide a way to configure the config map used for consumer and producer properties. diff --git a/docs/pom.xml b/docs/pom.xml index a526dbed..1778c636 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -7,7 +7,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.2.0-M2 + 3.2.0-SNAPSHOT jar spring-cloud-stream-binder-kafka-docs diff --git a/docs/src/main/asciidoc/_configprops.adoc b/docs/src/main/asciidoc/_configprops.adoc index 9cf8dad6..7ff5a9f0 100644 --- a/docs/src/main/asciidoc/_configprops.adoc +++ b/docs/src/main/asciidoc/_configprops.adoc @@ -9,6 +9,7 @@ |spring.cloud.stream.dynamic-destinations | `[]` | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound. |spring.cloud.stream.function.batch-mode | `false` | |spring.cloud.stream.function.bindings | | +|spring.cloud.stream.function.definition | | Definition of functions to bind. If several functions need to be composed into one, use pipes (e.g., 'fooFunc\|barFunc') |spring.cloud.stream.instance-count | `1` | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding. |spring.cloud.stream.instance-index | `0` | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding. |spring.cloud.stream.instance-index-list | | A list of instance id's from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is the name of the binding. This setting will override the one set in 'spring.cloud.stream.instance-index' diff --git a/pom.xml b/pom.xml index 4130cc5e..1ce40e40 100644 --- a/pom.xml +++ b/pom.xml @@ -2,12 +2,12 @@ 4.0.0 spring-cloud-stream-binder-kafka-parent - 3.2.0-M2 + 3.2.0-SNAPSHOT pom org.springframework.cloud spring-cloud-build - 3.1.0-M2 + 3.1.0-SNAPSHOT @@ -24,8 +24,8 @@ 2.8.0-M3 5.5.2 2.8.0 - 1.2.0-M2 - 3.2.0-M2 + 1.2.0-SNAPSHOT + 3.2.0-SNAPSHOT true true true diff --git a/spring-cloud-starter-stream-kafka/pom.xml b/spring-cloud-starter-stream-kafka/pom.xml index 4db28782..42f3042b 100644 --- a/spring-cloud-starter-stream-kafka/pom.xml +++ b/spring-cloud-starter-stream-kafka/pom.xml @@ -4,7 +4,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.2.0-M2 + 3.2.0-SNAPSHOT spring-cloud-starter-stream-kafka Spring Cloud Starter Stream Kafka diff --git a/spring-cloud-stream-binder-kafka-core/pom.xml b/spring-cloud-stream-binder-kafka-core/pom.xml index d458961b..d550a149 100644 --- a/spring-cloud-stream-binder-kafka-core/pom.xml +++ b/spring-cloud-stream-binder-kafka-core/pom.xml @@ -5,7 +5,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.2.0-M2 + 3.2.0-SNAPSHOT spring-cloud-stream-binder-kafka-core Spring Cloud Stream Kafka Binder Core diff --git a/spring-cloud-stream-binder-kafka-streams/pom.xml b/spring-cloud-stream-binder-kafka-streams/pom.xml index 91dd655f..a7503302 100644 --- a/spring-cloud-stream-binder-kafka-streams/pom.xml +++ b/spring-cloud-stream-binder-kafka-streams/pom.xml @@ -10,7 +10,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.2.0-M2 + 3.2.0-SNAPSHOT diff --git a/spring-cloud-stream-binder-kafka/pom.xml b/spring-cloud-stream-binder-kafka/pom.xml index b3fa494d..df027c2a 100644 --- a/spring-cloud-stream-binder-kafka/pom.xml +++ b/spring-cloud-stream-binder-kafka/pom.xml @@ -10,7 +10,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.2.0-M2 + 3.2.0-SNAPSHOT