From 097fb89d9e2fe1201d94e2a9ffe71c05a3548df3 Mon Sep 17 00:00:00 2001 From: buildmaster Date: Tue, 17 Nov 2020 16:03:30 +0000 Subject: [PATCH] Update SNAPSHOT to 3.1.0-M4 --- README.adoc | 8 +- docs/pom.xml | 2 +- docs/src/main/asciidoc/_configprops.adoc | 120 +++++++++--------- pom.xml | 8 +- spring-cloud-starter-stream-kafka/pom.xml | 2 +- spring-cloud-stream-binder-kafka-core/pom.xml | 2 +- .../pom.xml | 2 +- spring-cloud-stream-binder-kafka/pom.xml | 2 +- 8 files changed, 75 insertions(+), 71 deletions(-) diff --git a/README.adoc b/README.adoc index 267794f3..195a9918 100644 --- a/README.adoc +++ b/README.adoc @@ -241,7 +241,7 @@ Default: null (equivalent to `earliest`). enableDlq:: When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named `error..`. -The DLQ topic name can be configurable by setting the `dlqName` property. +The DLQ topic name can be configurable by setting the `dlqName` property or by defining a `@Bean` of type `DlqDestinationResolver`. This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome. See <> processing for more information. Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message`, and `x-exception-stacktrace` as `byte[]`. @@ -381,6 +381,11 @@ messageKeyExpression:: A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message -- for example, `headers['myKey']`. With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of a `byte[]`. Now, the expression is evaluated before the payload is converted. +In the case of a regular processor (`Function` or `Function, Message`), if the produced key needs to be same as the incoming key from the topic, this property can be set as below. +`spring.cloud.stream.kafka.bindings..producer.messageKeyExpression: headers['kafka_receivedMessageKey']` +There is an important caveat to keep in mind for reactive functions. +In that case, it is up to the application to manually copy the headers from the incoming messages to outbound messages. +You can set the header, e.g. `myKey` and use `headers['myKey']` as suggested above or, for convenience, simply set the `KafkaHeaders.MESSAGE_KEY` header, and you do not need to set this property at all. + Default: `none`. headerPatterns:: @@ -770,7 +775,6 @@ Both of these interfaces provide a way to configure the config map used for cons For example, if you want to gain access to a bean that is defined at the application level, you can inject that in the implementation of the `configure` method. When the binder discovers that these customizers are available as beans, it will invoke the `configure` method right before creating the consumer and producer factories. - = Appendices [appendix] [[building]] diff --git a/docs/pom.xml b/docs/pom.xml index a1faace2..adca4dcf 100644 --- a/docs/pom.xml +++ b/docs/pom.xml @@ -7,7 +7,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.1.0-SNAPSHOT + 3.1.0-M4 jar spring-cloud-stream-binder-kafka-docs diff --git a/docs/src/main/asciidoc/_configprops.adoc b/docs/src/main/asciidoc/_configprops.adoc index 539fe315..8053c12a 100644 --- a/docs/src/main/asciidoc/_configprops.adoc +++ b/docs/src/main/asciidoc/_configprops.adoc @@ -1,65 +1,65 @@ |=== |Name | Default | Description -|spring.cloud.stream.binders | | Additional per-binder properties (see {@link BinderProperties}) if more then one binder of the same type is used (i.e., connect to multiple instances of RabbitMq). Here you can specify multiple binder configurations, each with different environment settings. For example; spring.cloud.stream.binders.rabbit1.environment. . . , spring.cloud.stream.binders.rabbit2.environment. . . -|spring.cloud.stream.binding-retry-interval | 30 | Retry interval (in seconds) used to schedule binding attempts. Default: 30 sec. -|spring.cloud.stream.bindings | | Additional binding properties (see {@link BinderProperties}) per binding name (e.g., 'input`). For example; This sets the content-type for the 'input' binding of a Sink application: 'spring.cloud.stream.bindings.input.contentType=text/plain' -|spring.cloud.stream.default-binder | | The name of the binder to use by all bindings in the event multiple binders available (e.g., 'rabbit'). -|spring.cloud.stream.dynamic-destination-cache-size | 10 | The maximum size of Least Recently Used (LRU) cache of dynamic destinations. Once this size is reached, new destinations will trigger the removal of old destinations. Default: 10 -|spring.cloud.stream.dynamic-destinations | [] | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound. -|spring.cloud.stream.function.batch-mode | false | -|spring.cloud.stream.function.bindings | | -|spring.cloud.stream.function.definition | | Definition of functions to bind. If several functions need to be composed into one, use pipes (e.g., 'fooFunc\|barFunc') -|spring.cloud.stream.instance-count | 1 | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding. -|spring.cloud.stream.instance-index | 0 | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding. -|spring.cloud.stream.instance-index-list | | A list of instance id's from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is the name of the binding. This setting will override the one set in 'spring.cloud.stream.instance-index' -|spring.cloud.stream.integration.message-handler-not-propagated-headers | | Message header names that will NOT be copied from the inbound message. -|spring.cloud.stream.kafka.binder.authorization-exception-retry-interval | | Time between retries after AuthorizationException is caught in the ListenerContainer; defalt is null which disables retries. For more info see: {@link org.springframework.kafka.listener.ConsumerProperties#setAuthorizationExceptionRetryInterval(java.time.Duration)} -|spring.cloud.stream.kafka.binder.auto-add-partitions | false | -|spring.cloud.stream.kafka.binder.auto-create-topics | true | -|spring.cloud.stream.kafka.binder.brokers | [localhost] | -|spring.cloud.stream.kafka.binder.configuration | | Arbitrary kafka properties that apply to both producers and consumers. -|spring.cloud.stream.kafka.binder.consider-down-when-any-partition-has-no-leader | false | -|spring.cloud.stream.kafka.binder.consumer-properties | | Arbitrary kafka consumer properties. -|spring.cloud.stream.kafka.binder.header-mapper-bean-name | | The bean name of a custom header mapper to use instead of a {@link org.springframework.kafka.support.DefaultKafkaHeaderMapper}. -|spring.cloud.stream.kafka.binder.headers | [] | -|spring.cloud.stream.kafka.binder.health-timeout | 60 | Time to wait to get partition information in seconds; default 60. -|spring.cloud.stream.kafka.binder.jaas | | -|spring.cloud.stream.kafka.binder.min-partition-count | 1 | -|spring.cloud.stream.kafka.binder.producer-properties | | Arbitrary kafka producer properties. -|spring.cloud.stream.kafka.binder.replication-factor | -1 | -|spring.cloud.stream.kafka.binder.required-acks | 1 | -|spring.cloud.stream.kafka.binder.transaction.producer.batch-timeout | | -|spring.cloud.stream.kafka.binder.transaction.producer.buffer-size | | -|spring.cloud.stream.kafka.binder.transaction.producer.compression-type | | -|spring.cloud.stream.kafka.binder.transaction.producer.configuration | | -|spring.cloud.stream.kafka.binder.transaction.producer.error-channel-enabled | | -|spring.cloud.stream.kafka.binder.transaction.producer.header-mode | | -|spring.cloud.stream.kafka.binder.transaction.producer.header-patterns | | -|spring.cloud.stream.kafka.binder.transaction.producer.message-key-expression | | -|spring.cloud.stream.kafka.binder.transaction.producer.partition-count | | -|spring.cloud.stream.kafka.binder.transaction.producer.partition-key-expression | | -|spring.cloud.stream.kafka.binder.transaction.producer.partition-key-extractor-name | | -|spring.cloud.stream.kafka.binder.transaction.producer.partition-selector-expression | | -|spring.cloud.stream.kafka.binder.transaction.producer.partition-selector-name | | -|spring.cloud.stream.kafka.binder.transaction.producer.required-groups | | -|spring.cloud.stream.kafka.binder.transaction.producer.sync | | -|spring.cloud.stream.kafka.binder.transaction.producer.topic | | -|spring.cloud.stream.kafka.binder.transaction.producer.use-native-encoding | | -|spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix | | -|spring.cloud.stream.kafka.bindings | | -|spring.cloud.stream.metrics.export-properties | | List of properties that are going to be appended to each message. This gets populate by onApplicationEvent, once the context refreshes to avoid overhead of doing per message basis. -|spring.cloud.stream.metrics.key | | The name of the metric being emitted. Should be an unique value per application. Defaults to: ${spring.application.name:${vcap.application.name:${spring.config.name:application}}}. -|spring.cloud.stream.metrics.meter-filter | | Pattern to control the 'meters' one wants to capture. By default all 'meters' will be captured. For example, 'spring.integration.*' will only capture metric information for meters whose name starts with 'spring.integration'. -|spring.cloud.stream.metrics.properties | | Application properties that should be added to the metrics payload For example: `spring.application**`. -|spring.cloud.stream.metrics.schedule-interval | 60s | Interval expressed as Duration for scheduling metrics snapshots publishing. Defaults to 60 seconds -|spring.cloud.stream.override-cloud-connectors | false | This property is only applicable when the cloud profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder detects a suitable bound service (for example, a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and uses it for creating connections (usually through Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (for example, relying on the spring.rabbitmq.* properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems. -|spring.cloud.stream.pollable-source | none | A semi-colon delimited list of binding names of pollable sources. Binding names follow the same naming convention as functions. For example, name '...pollable-source=foobar' will be accessible as 'foobar-iin-0'' binding -|spring.cloud.stream.poller.cron | | Cron expression value for the Cron Trigger. -|spring.cloud.stream.poller.fixed-delay | 1000 | Fixed delay for default poller. -|spring.cloud.stream.poller.initial-delay | 0 | Initial delay for periodic triggers. -|spring.cloud.stream.poller.max-messages-per-poll | 1 | Maximum messages per poll for the default poller. -|spring.cloud.stream.sendto.destination | none | The name of the header used to determine the name of the output destination -|spring.cloud.stream.source | | A colon delimited string representing the names of the sources based on which source bindings will be created. This is primarily to support cases where source binding may be required without providing a corresponding Supplier. (e.g., for cases where the actual source of data is outside of scope of spring-cloud-stream - HTTP -> Stream) +|spring.cloud.stream.binders | `` | Additional per-binder properties (see {@link BinderProperties}) if more then one binder of the same type is used (i.e., connect to multiple instances of RabbitMq). Here you can specify multiple binder configurations, each with different environment settings. For example; spring.cloud.stream.binders.rabbit1.environment. . . , spring.cloud.stream.binders.rabbit2.environment. . . +|spring.cloud.stream.binding-retry-interval | `30` | Retry interval (in seconds) used to schedule binding attempts. Default: 30 sec. +|spring.cloud.stream.bindings | `` | Additional binding properties (see {@link BinderProperties}) per binding name (e.g., 'input`). For example; This sets the content-type for the 'input' binding of a Sink application: 'spring.cloud.stream.bindings.input.contentType=text/plain' +|spring.cloud.stream.default-binder | `` | The name of the binder to use by all bindings in the event multiple binders available (e.g., 'rabbit'). +|spring.cloud.stream.dynamic-destination-cache-size | `10` | The maximum size of Least Recently Used (LRU) cache of dynamic destinations. Once this size is reached, new destinations will trigger the removal of old destinations. Default: 10 +|spring.cloud.stream.dynamic-destinations | `[]` | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound. +|spring.cloud.stream.function.batch-mode | `false` | +|spring.cloud.stream.function.bindings | `` | +|spring.cloud.stream.function.definition | `` | Definition of functions to bind. If several functions need to be composed into one, use pipes (e.g., 'fooFunc\|barFunc') +|spring.cloud.stream.instance-count | `1` | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding. +|spring.cloud.stream.instance-index | `0` | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding. +|spring.cloud.stream.instance-index-list | `` | A list of instance id's from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is the name of the binding. This setting will override the one set in 'spring.cloud.stream.instance-index' +|spring.cloud.stream.integration.message-handler-not-propagated-headers | `` | Message header names that will NOT be copied from the inbound message. +|spring.cloud.stream.kafka.binder.authorization-exception-retry-interval | `` | Time between retries after AuthorizationException is caught in the ListenerContainer; defalt is null which disables retries. For more info see: {@link org.springframework.kafka.listener.ConsumerProperties#setAuthorizationExceptionRetryInterval(java.time.Duration)} +|spring.cloud.stream.kafka.binder.auto-add-partitions | `false` | +|spring.cloud.stream.kafka.binder.auto-create-topics | `true` | +|spring.cloud.stream.kafka.binder.brokers | `[localhost]` | +|spring.cloud.stream.kafka.binder.configuration | `` | Arbitrary kafka properties that apply to both producers and consumers. +|spring.cloud.stream.kafka.binder.consider-down-when-any-partition-has-no-leader | `false` | +|spring.cloud.stream.kafka.binder.consumer-properties | `` | Arbitrary kafka consumer properties. +|spring.cloud.stream.kafka.binder.header-mapper-bean-name | `` | The bean name of a custom header mapper to use instead of a {@link org.springframework.kafka.support.DefaultKafkaHeaderMapper}. +|spring.cloud.stream.kafka.binder.headers | `[]` | +|spring.cloud.stream.kafka.binder.health-timeout | `60` | Time to wait to get partition information in seconds; default 60. +|spring.cloud.stream.kafka.binder.jaas | `` | +|spring.cloud.stream.kafka.binder.min-partition-count | `1` | +|spring.cloud.stream.kafka.binder.producer-properties | `` | Arbitrary kafka producer properties. +|spring.cloud.stream.kafka.binder.replication-factor | `-1` | +|spring.cloud.stream.kafka.binder.required-acks | `1` | +|spring.cloud.stream.kafka.binder.transaction.producer.batch-timeout | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.buffer-size | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.compression-type | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.configuration | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.error-channel-enabled | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.header-mode | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.header-patterns | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.message-key-expression | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.partition-count | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.partition-key-expression | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.partition-key-extractor-name | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.partition-selector-expression | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.partition-selector-name | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.required-groups | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.sync | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.topic | `` | +|spring.cloud.stream.kafka.binder.transaction.producer.use-native-encoding | `` | +|spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix | `` | +|spring.cloud.stream.kafka.bindings | `` | +|spring.cloud.stream.metrics.export-properties | `` | List of properties that are going to be appended to each message. This gets populate by onApplicationEvent, once the context refreshes to avoid overhead of doing per message basis. +|spring.cloud.stream.metrics.key | `` | The name of the metric being emitted. Should be an unique value per application. Defaults to: ${spring.application.name:${vcap.application.name:${spring.config.name:application}}}. +|spring.cloud.stream.metrics.meter-filter | `` | Pattern to control the 'meters' one wants to capture. By default all 'meters' will be captured. For example, 'spring.integration.*' will only capture metric information for meters whose name starts with 'spring.integration'. +|spring.cloud.stream.metrics.properties | `` | Application properties that should be added to the metrics payload For example: `spring.application**`. +|spring.cloud.stream.metrics.schedule-interval | `60s` | Interval expressed as Duration for scheduling metrics snapshots publishing. Defaults to 60 seconds +|spring.cloud.stream.override-cloud-connectors | `false` | This property is only applicable when the cloud profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder detects a suitable bound service (for example, a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and uses it for creating connections (usually through Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (for example, relying on the spring.rabbitmq.* properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems. +|spring.cloud.stream.pollable-source | `none` | A semi-colon delimited list of binding names of pollable sources. Binding names follow the same naming convention as functions. For example, name '...pollable-source=foobar' will be accessible as 'foobar-iin-0'' binding +|spring.cloud.stream.poller.cron | `` | Cron expression value for the Cron Trigger. +|spring.cloud.stream.poller.fixed-delay | `1000` | Fixed delay for default poller. +|spring.cloud.stream.poller.initial-delay | `0` | Initial delay for periodic triggers. +|spring.cloud.stream.poller.max-messages-per-poll | `1` | Maximum messages per poll for the default poller. +|spring.cloud.stream.sendto.destination | `none` | The name of the header used to determine the name of the output destination +|spring.cloud.stream.source | `` | A colon delimited string representing the names of the sources based on which source bindings will be created. This is primarily to support cases where source binding may be required without providing a corresponding Supplier. (e.g., for cases where the actual source of data is outside of scope of spring-cloud-stream - HTTP -> Stream) |=== \ No newline at end of file diff --git a/pom.xml b/pom.xml index 24be90bd..1675309e 100644 --- a/pom.xml +++ b/pom.xml @@ -2,12 +2,12 @@ 4.0.0 spring-cloud-stream-binder-kafka-parent - 3.1.0-SNAPSHOT + 3.1.0-M4 pom org.springframework.cloud spring-cloud-build - 3.0.0-SNAPSHOT + 3.0.0-M5 @@ -15,8 +15,8 @@ 2.6.3 5.4.1 2.6.0 - 1.1.0-SNAPSHOT - 3.1.0-SNAPSHOT + 1.1.0-M4 + 3.1.0-M4 true true true diff --git a/spring-cloud-starter-stream-kafka/pom.xml b/spring-cloud-starter-stream-kafka/pom.xml index 4efe3a7c..a3940060 100644 --- a/spring-cloud-starter-stream-kafka/pom.xml +++ b/spring-cloud-starter-stream-kafka/pom.xml @@ -4,7 +4,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.1.0-SNAPSHOT + 3.1.0-M4 spring-cloud-starter-stream-kafka Spring Cloud Starter Stream Kafka diff --git a/spring-cloud-stream-binder-kafka-core/pom.xml b/spring-cloud-stream-binder-kafka-core/pom.xml index 9bef9962..86553668 100644 --- a/spring-cloud-stream-binder-kafka-core/pom.xml +++ b/spring-cloud-stream-binder-kafka-core/pom.xml @@ -5,7 +5,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.1.0-SNAPSHOT + 3.1.0-M4 spring-cloud-stream-binder-kafka-core Spring Cloud Stream Kafka Binder Core diff --git a/spring-cloud-stream-binder-kafka-streams/pom.xml b/spring-cloud-stream-binder-kafka-streams/pom.xml index 1b4c321c..e9297121 100644 --- a/spring-cloud-stream-binder-kafka-streams/pom.xml +++ b/spring-cloud-stream-binder-kafka-streams/pom.xml @@ -10,7 +10,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.1.0-SNAPSHOT + 3.1.0-M4 diff --git a/spring-cloud-stream-binder-kafka/pom.xml b/spring-cloud-stream-binder-kafka/pom.xml index 67292441..1efda78f 100644 --- a/spring-cloud-stream-binder-kafka/pom.xml +++ b/spring-cloud-stream-binder-kafka/pom.xml @@ -10,7 +10,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 3.1.0-SNAPSHOT + 3.1.0-M4