Update SNAPSHOT to 3.1.0-RC1
This commit is contained in:
50
README.adoc
50
README.adoc
@@ -70,7 +70,7 @@ Also, 0.11.x.x does not support the `autoAddPartitions` property.
|
||||
|
||||
This section contains the configuration options used by the Apache Kafka binder.
|
||||
|
||||
For common configuration options and properties pertaining to binder, see the <<binding-properties,core documentation>>.
|
||||
For common configuration options and properties pertaining to the binder, see the https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#binding-properties[binding properties] in core documentation.
|
||||
|
||||
==== Kafka Binder Properties
|
||||
|
||||
@@ -147,11 +147,6 @@ If set to `false`, the binder relies on the partition size of the topic being al
|
||||
If the partition count of the target topic is smaller than the expected value, the binder fails to start.
|
||||
+
|
||||
Default: `false`.
|
||||
spring.cloud.stream.kafka.binder.autoAlterTopics::
|
||||
If set to `true`, the binder alters destination topic configs if required.
|
||||
If set to `false`, the binder relies on existing configs of the topic.
|
||||
+
|
||||
Default: `false`.
|
||||
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix::
|
||||
Enables transactions in the binder. See `transaction.id` in the Kafka documentation and https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions[Transactions] in the `spring-kafka` documentation.
|
||||
When transactions are enabled, individual `producer` properties are ignored and all producers use the `spring.cloud.stream.kafka.binder.transaction.producer.*` properties.
|
||||
@@ -175,6 +170,14 @@ Flag to set the binder health as `down`, when any partitions on the topic, regar
|
||||
+
|
||||
Default: `false`.
|
||||
|
||||
spring.cloud.stream.kafka.binder.certificateStoreDirectory::
|
||||
When the truststore or keystore certificate location is given as a classpath URL (`classpath:...`), the binder copies the resource from the classpath location inside the JAR file to a location on the filesystem.
|
||||
The file will be moved to the location specified as the value for this property which must be an existing directory on the filesystem that is writable by the process running the application.
|
||||
If this value is not set and the certificate file is a classpath resource, then it will be moved to System's temp directory as returned by `System.getProperty("java.io.tmpdir")`.
|
||||
This is also true, if this value is present, but the directory cannot be found on the filesystem or is not writable.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
[[kafka-consumer-properties]]
|
||||
==== Kafka Consumer Properties
|
||||
|
||||
@@ -246,7 +249,7 @@ Default: null (equivalent to `earliest`).
|
||||
enableDlq::
|
||||
When set to true, it enables DLQ behavior for the consumer.
|
||||
By default, messages that result in errors are forwarded to a topic named `error.<destination>.<group>`.
|
||||
The DLQ topic name can be configurable by setting the `dlqName` property.
|
||||
The DLQ topic name can be configurable by setting the `dlqName` property or by defining a `@Bean` of type `DlqDestinationResolver`.
|
||||
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
|
||||
See <<kafka-dlq-processing>> processing for more information.
|
||||
Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message`, and `x-exception-stacktrace` as `byte[]`.
|
||||
@@ -386,6 +389,11 @@ messageKeyExpression::
|
||||
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message -- for example, `headers['myKey']`.
|
||||
With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of a `byte[]`.
|
||||
Now, the expression is evaluated before the payload is converted.
|
||||
In the case of a regular processor (`Function<String, String>` or `Function<Message<?>, Message<?>`), if the produced key needs to be same as the incoming key from the topic, this property can be set as below.
|
||||
`spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']`
|
||||
There is an important caveat to keep in mind for reactive functions.
|
||||
In that case, it is up to the application to manually copy the headers from the incoming messages to outbound messages.
|
||||
You can set the header, e.g. `myKey` and use `headers['myKey']` as suggested above or, for convenience, simply set the `KafkaHeaders.MESSAGE_KEY` header, and you do not need to set this property at all.
|
||||
+
|
||||
Default: `none`.
|
||||
headerPatterns::
|
||||
@@ -456,6 +464,13 @@ Timeout in number of seconds to wait for when closing the producer.
|
||||
+
|
||||
Default: `30`
|
||||
|
||||
allowNonTransactional::
|
||||
Normally, all output bindings associated with a transactional binder will publish in a new transaction, if one is not already in process.
|
||||
This property allows you to override that behavior.
|
||||
If set to true, records published to this output binding will not be run in a transaction, unless one is already in process.
|
||||
+
|
||||
Default: `false`
|
||||
|
||||
==== Usage examples
|
||||
|
||||
In this section, we show the use of the preceding properties for specific scenarios.
|
||||
@@ -674,7 +689,7 @@ IMPORTANT: If you deploy multiple instances of your application, each instance n
|
||||
=== Error Channels
|
||||
|
||||
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
|
||||
See <<spring-cloud-stream-overview-error-handling>> for more information.
|
||||
See https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling[this section on error handling] for more information.
|
||||
|
||||
The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureException` with properties:
|
||||
|
||||
@@ -690,9 +705,25 @@ You can consume these exceptions with your own Spring Integration flow.
|
||||
Kafka binder module exposes the following metrics:
|
||||
|
||||
`spring.cloud.stream.binder.kafka.offset`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
|
||||
The metrics provided are based on the Mircometer metrics library. The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
|
||||
The metrics provided are based on the Micrometer library.
|
||||
The binder creates the `KafkaBinderMetrics` bean if Micrometer is on the classpath and no other such beans provided by the application.
|
||||
The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
|
||||
This metric is particularly useful for providing auto-scaling feedback to a PaaS platform.
|
||||
|
||||
You can exclude `KafkaBinderMetrics` from creating the necessary infrastructure like consumers and then reporting the metrics by providing the following component in the application.
|
||||
|
||||
```
|
||||
@Component
|
||||
class NoOpBindingMeters {
|
||||
NoOpBindingMeters(MeterRegistry registry) {
|
||||
registry.config().meterFilter(
|
||||
MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
More details on how to suppress meters selectively can be found https://micrometer.io/docs/concepts#_meter_filters[here].
|
||||
|
||||
[[kafka-tombstones]]
|
||||
=== Tombstone Records (null record values)
|
||||
|
||||
@@ -775,6 +806,7 @@ Both of these interfaces provide a way to configure the config map used for cons
|
||||
For example, if you want to gain access to a bean that is defined at the application level, you can inject that in the implementation of the `configure` method.
|
||||
When the binder discovers that these customizers are available as beans, it will invoke the `configure` method right before creating the consumer and producer factories.
|
||||
|
||||
Both of these interfaces also provide access to both the binding and destination names so that they can be accessed while customizing producer and consumer properties.
|
||||
|
||||
= Appendices
|
||||
[appendix]
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.1.0-SNAPSHOT</version>
|
||||
<version>3.1.0-RC1</version>
|
||||
</parent>
|
||||
<packaging>jar</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
|
||||
@@ -2,33 +2,35 @@
|
||||
|Name | Default | Description
|
||||
|
||||
|spring.cloud.stream.binders | | Additional per-binder properties (see {@link BinderProperties}) if more then one binder of the same type is used (i.e., connect to multiple instances of RabbitMq). Here you can specify multiple binder configurations, each with different environment settings. For example; spring.cloud.stream.binders.rabbit1.environment. . . , spring.cloud.stream.binders.rabbit2.environment. . .
|
||||
|spring.cloud.stream.binding-retry-interval | 30 | Retry interval (in seconds) used to schedule binding attempts. Default: 30 sec.
|
||||
|spring.cloud.stream.binding-retry-interval | `30` | Retry interval (in seconds) used to schedule binding attempts. Default: 30 sec.
|
||||
|spring.cloud.stream.bindings | | Additional binding properties (see {@link BinderProperties}) per binding name (e.g., 'input`). For example; This sets the content-type for the 'input' binding of a Sink application: 'spring.cloud.stream.bindings.input.contentType=text/plain'
|
||||
|spring.cloud.stream.default-binder | | The name of the binder to use by all bindings in the event multiple binders available (e.g., 'rabbit').
|
||||
|spring.cloud.stream.dynamic-destination-cache-size | 10 | The maximum size of Least Recently Used (LRU) cache of dynamic destinations. Once this size is reached, new destinations will trigger the removal of old destinations. Default: 10
|
||||
|spring.cloud.stream.dynamic-destinations | [] | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound.
|
||||
|spring.cloud.stream.function.batch-mode | false |
|
||||
|spring.cloud.stream.dynamic-destination-cache-size | `10` | The maximum size of Least Recently Used (LRU) cache of dynamic destinations. Once this size is reached, new destinations will trigger the removal of old destinations. Default: 10
|
||||
|spring.cloud.stream.dynamic-destinations | `[]` | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound.
|
||||
|spring.cloud.stream.function.batch-mode | `false` |
|
||||
|spring.cloud.stream.function.bindings | |
|
||||
|spring.cloud.stream.function.definition | | Definition of functions to bind. If several functions need to be composed into one, use pipes (e.g., 'fooFunc\|barFunc')
|
||||
|spring.cloud.stream.instance-count | 1 | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index | 0 | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-count | `1` | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index | `0` | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index-list | | A list of instance id's from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is the name of the binding. This setting will override the one set in 'spring.cloud.stream.instance-index'
|
||||
|spring.cloud.stream.integration.message-handler-not-propagated-headers | | Message header names that will NOT be copied from the inbound message.
|
||||
|spring.cloud.stream.kafka.binder.authorization-exception-retry-interval | | Time between retries after AuthorizationException is caught in the ListenerContainer; defalt is null which disables retries. For more info see: {@link org.springframework.kafka.listener.ConsumerProperties#setAuthorizationExceptionRetryInterval(java.time.Duration)}
|
||||
|spring.cloud.stream.kafka.binder.auto-add-partitions | false |
|
||||
|spring.cloud.stream.kafka.binder.auto-create-topics | true |
|
||||
|spring.cloud.stream.kafka.binder.brokers | [localhost] |
|
||||
|spring.cloud.stream.kafka.binder.auto-add-partitions | `false` |
|
||||
|spring.cloud.stream.kafka.binder.auto-alter-topics | `false` |
|
||||
|spring.cloud.stream.kafka.binder.auto-create-topics | `true` |
|
||||
|spring.cloud.stream.kafka.binder.brokers | `[localhost]` |
|
||||
|spring.cloud.stream.kafka.binder.certificate-store-directory | | When a certificate store location is given as classpath URL (classpath:), then the binder moves the resource from the classpath location inside the JAR to a location on the filesystem. If this value is set, then this location is used, otherwise, the certificate file is copied to the directory returned by java.io.tmpdir.
|
||||
|spring.cloud.stream.kafka.binder.configuration | | Arbitrary kafka properties that apply to both producers and consumers.
|
||||
|spring.cloud.stream.kafka.binder.consider-down-when-any-partition-has-no-leader | false |
|
||||
|spring.cloud.stream.kafka.binder.consider-down-when-any-partition-has-no-leader | `false` |
|
||||
|spring.cloud.stream.kafka.binder.consumer-properties | | Arbitrary kafka consumer properties.
|
||||
|spring.cloud.stream.kafka.binder.header-mapper-bean-name | | The bean name of a custom header mapper to use instead of a {@link org.springframework.kafka.support.DefaultKafkaHeaderMapper}.
|
||||
|spring.cloud.stream.kafka.binder.headers | [] |
|
||||
|spring.cloud.stream.kafka.binder.health-timeout | 60 | Time to wait to get partition information in seconds; default 60.
|
||||
|spring.cloud.stream.kafka.binder.headers | `[]` |
|
||||
|spring.cloud.stream.kafka.binder.health-timeout | `60` | Time to wait to get partition information in seconds; default 60.
|
||||
|spring.cloud.stream.kafka.binder.jaas | |
|
||||
|spring.cloud.stream.kafka.binder.min-partition-count | 1 |
|
||||
|spring.cloud.stream.kafka.binder.min-partition-count | `1` |
|
||||
|spring.cloud.stream.kafka.binder.producer-properties | | Arbitrary kafka producer properties.
|
||||
|spring.cloud.stream.kafka.binder.replication-factor | -1 |
|
||||
|spring.cloud.stream.kafka.binder.required-acks | 1 |
|
||||
|spring.cloud.stream.kafka.binder.replication-factor | `-1` |
|
||||
|spring.cloud.stream.kafka.binder.required-acks | `1` |
|
||||
|spring.cloud.stream.kafka.binder.transaction.producer.batch-timeout | |
|
||||
|spring.cloud.stream.kafka.binder.transaction.producer.buffer-size | |
|
||||
|spring.cloud.stream.kafka.binder.transaction.producer.compression-type | |
|
||||
@@ -52,14 +54,15 @@
|
||||
|spring.cloud.stream.metrics.key | | The name of the metric being emitted. Should be an unique value per application. Defaults to: ${spring.application.name:${vcap.application.name:${spring.config.name:application}}}.
|
||||
|spring.cloud.stream.metrics.meter-filter | | Pattern to control the 'meters' one wants to capture. By default all 'meters' will be captured. For example, 'spring.integration.*' will only capture metric information for meters whose name starts with 'spring.integration'.
|
||||
|spring.cloud.stream.metrics.properties | | Application properties that should be added to the metrics payload For example: `spring.application**`.
|
||||
|spring.cloud.stream.metrics.schedule-interval | 60s | Interval expressed as Duration for scheduling metrics snapshots publishing. Defaults to 60 seconds
|
||||
|spring.cloud.stream.override-cloud-connectors | false | This property is only applicable when the cloud profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder detects a suitable bound service (for example, a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and uses it for creating connections (usually through Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (for example, relying on the spring.rabbitmq.* properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.
|
||||
|spring.cloud.stream.pollable-source | none | A semi-colon delimited list of binding names of pollable sources. Binding names follow the same naming convention as functions. For example, name '...pollable-source=foobar' will be accessible as 'foobar-iin-0'' binding
|
||||
|spring.cloud.stream.metrics.schedule-interval | `60s` | Interval expressed as Duration for scheduling metrics snapshots publishing. Defaults to 60 seconds
|
||||
|spring.cloud.stream.override-cloud-connectors | `false` | This property is only applicable when the cloud profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder detects a suitable bound service (for example, a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and uses it for creating connections (usually through Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (for example, relying on the spring.rabbitmq.* properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.
|
||||
|spring.cloud.stream.pollable-source | `none` | A semi-colon delimited list of binding names of pollable sources. Binding names follow the same naming convention as functions. For example, name '...pollable-source=foobar' will be accessible as 'foobar-iin-0'' binding
|
||||
|spring.cloud.stream.poller.cron | | Cron expression value for the Cron Trigger.
|
||||
|spring.cloud.stream.poller.fixed-delay | 1000 | Fixed delay for default poller.
|
||||
|spring.cloud.stream.poller.initial-delay | 0 | Initial delay for periodic triggers.
|
||||
|spring.cloud.stream.poller.max-messages-per-poll | 1 | Maximum messages per poll for the default poller.
|
||||
|spring.cloud.stream.sendto.destination | none | The name of the header used to determine the name of the output destination
|
||||
|spring.cloud.stream.poller.fixed-delay | `1000` | Fixed delay for default poller.
|
||||
|spring.cloud.stream.poller.initial-delay | `0` | Initial delay for periodic triggers.
|
||||
|spring.cloud.stream.poller.max-messages-per-poll | `1` | Maximum messages per poll for the default poller.
|
||||
|spring.cloud.stream.poller.time-unit | | The TimeUnit to apply to delay values.
|
||||
|spring.cloud.stream.sendto.destination | `none` | The name of the header used to determine the name of the output destination
|
||||
|spring.cloud.stream.source | | A colon delimited string representing the names of the sources based on which source bindings will be created. This is primarily to support cases where source binding may be required without providing a corresponding Supplier. (e.g., for cases where the actual source of data is outside of scope of spring-cloud-stream - HTTP -> Stream)
|
||||
|
||||
|===
|
||||
8
pom.xml
8
pom.xml
@@ -2,12 +2,12 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.1.0-SNAPSHOT</version>
|
||||
<version>3.1.0-RC1</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>3.0.0-SNAPSHOT</version>
|
||||
<version>3.0.0-RC1</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
@@ -15,8 +15,8 @@
|
||||
<spring-kafka.version>2.6.3</spring-kafka.version>
|
||||
<spring-integration-kafka.version>5.4.1</spring-integration-kafka.version>
|
||||
<kafka.version>2.6.0</kafka.version>
|
||||
<spring-cloud-schema-registry.version>1.1.0-SNAPSHOT</spring-cloud-schema-registry.version>
|
||||
<spring-cloud-stream.version>3.1.0-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-schema-registry.version>1.1.0-RC1</spring-cloud-schema-registry.version>
|
||||
<spring-cloud-stream.version>3.1.0-RC1</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.1.0-SNAPSHOT</version>
|
||||
<version>3.1.0-RC1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.1.0-SNAPSHOT</version>
|
||||
<version>3.1.0-RC1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.1.0-SNAPSHOT</version>
|
||||
<version>3.1.0-RC1</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.1.0-SNAPSHOT</version>
|
||||
<version>3.1.0-RC1</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
Reference in New Issue
Block a user