Doc Polishing for resetOffsets (#1045)
* Doc Polishing for resetOffsets * Fix typos.
This commit is contained in:
116
README.adoc
116
README.adoc
@@ -70,7 +70,7 @@ Also, 0.11.x.x does not support the `autoAddPartitions` property.
|
||||
|
||||
This section contains the configuration options used by the Apache Kafka binder.
|
||||
|
||||
For common configuration options and properties pertaining to binder, see the <<binding-properties,core documentation>>.
|
||||
For common configuration options and properties pertaining to the binder, see the https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#binding-properties[binding properties] in core documentation.
|
||||
|
||||
==== Kafka Binder Properties
|
||||
|
||||
@@ -147,11 +147,6 @@ If set to `false`, the binder relies on the partition size of the topic being al
|
||||
If the partition count of the target topic is smaller than the expected value, the binder fails to start.
|
||||
+
|
||||
Default: `false`.
|
||||
spring.cloud.stream.kafka.binder.autoAlterTopics::
|
||||
If set to `true`, the binder alters destination topic configs if required.
|
||||
If set to `false`, the binder relies on existing configs of the topic.
|
||||
+
|
||||
Default: `false`.
|
||||
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix::
|
||||
Enables transactions in the binder. See `transaction.id` in the Kafka documentation and https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions[Transactions] in the `spring-kafka` documentation.
|
||||
When transactions are enabled, individual `producer` properties are ignored and all producers use the `spring.cloud.stream.kafka.binder.transaction.producer.*` properties.
|
||||
@@ -175,6 +170,14 @@ Flag to set the binder health as `down`, when any partitions on the topic, regar
|
||||
+
|
||||
Default: `false`.
|
||||
|
||||
spring.cloud.stream.kafka.binder.certificateStoreDirectory::
|
||||
When the truststore or keystore certificate location is given as a classpath URL (`classpath:...`), the binder copies the resource from the classpath location inside the JAR file to a location on the filesystem.
|
||||
The file will be moved to the location specified as the value for this property which must be an existing directory on the filesystem that is writable by the process running the application.
|
||||
If this value is not set and the certificate file is a classpath resource, then it will be moved to System's temp directory as returned by `System.getProperty("java.io.tmpdir")`.
|
||||
This is also true, if this value is present, but the directory cannot be found on the filesystem or is not writable.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
[[kafka-consumer-properties]]
|
||||
==== Kafka Consumer Properties
|
||||
|
||||
@@ -210,13 +213,17 @@ This property is deprecated as of 3.1 in favor of using `ackMode`.
|
||||
If the `ackMode` is not set and batch mode is not enabled, `RECORD` ackMode will be used.
|
||||
+
|
||||
Default: `false`.
|
||||
|
||||
autoCommitOffset::
|
||||
|
||||
Starting with version 3.1, this property is deprecated.
|
||||
See `ackMode` for more details on alternatives.
|
||||
Whether to autocommit offsets when a message has been processed.
|
||||
If set to `false`, a header with the key `kafka_acknowledgment` of the type `org.springframework.kafka.support.Acknowledgment` header is present in the inbound message.
|
||||
Applications may use this header for acknowledging messages.
|
||||
See the examples section for details.
|
||||
When this property is set to `false`, Kafka binder sets the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL` and the application is responsible for acknowledging records.
|
||||
Also see `ackEachRecord`. This property is deprecated as of 3.1. See `ackMode` for more details.
|
||||
Also see `ackEachRecord`.
|
||||
+
|
||||
Default: `true`.
|
||||
ackMode::
|
||||
@@ -225,28 +232,28 @@ This is based on the AckMode enumeration defined in Spring Kafka.
|
||||
If `ackEachRecord` property is set to `true` and consumer is not in batch mode, then this will use the ack mode of `RECORD`, otherwise, use the provided ack mode using this property.
|
||||
|
||||
autoCommitOnError::
|
||||
Effective only if `autoCommitOffset` is set to `true`.
|
||||
If set to `false`, it suppresses auto-commits for messages that result in errors and commits only for successful messages. It allows a stream to automatically replay from the last successfully processed message, in case of persistent failures.
|
||||
If set to `true`, it always auto-commits (if auto-commit is enabled).
|
||||
If not set (the default), it effectively has the same value as `enableDlq`, auto-committing erroneous messages if they are sent to a DLQ and not committing them otherwise.
|
||||
In pollable consumers, if set to `true`, it always auto commits on error.
|
||||
If not set (the default) or false, it will not auto commit in pollable consumers.
|
||||
Note that this property is only applicable for pollable consumers.
|
||||
+
|
||||
Default: not set.
|
||||
resetOffsets::
|
||||
Whether to reset offsets on the consumer to the value provided by startOffset.
|
||||
Must be false if a `KafkaRebalanceListener` is provided; see <<rebalance-listener>>.
|
||||
See <<reset-offsets>> for more information about this property.
|
||||
+
|
||||
Default: `false`.
|
||||
startOffset::
|
||||
The starting offset for new groups.
|
||||
Allowed values: `earliest` and `latest`.
|
||||
If the consumer group is set explicitly for the consumer 'binding' (through `spring.cloud.stream.kafka.bindings.<channelName>.group`), 'startOffset' is set to `earliest`. Otherwise, it is set to `latest` for the `anonymous` consumer group.
|
||||
Also see `resetOffsets` (earlier in this list).
|
||||
If the consumer group is set explicitly for the consumer 'binding' (through `spring.cloud.stream.bindings.<channelName>.group`), 'startOffset' is set to `earliest`. Otherwise, it is set to `latest` for the `anonymous` consumer group.
|
||||
See <<reset-offsets>> for more information about this property.
|
||||
+
|
||||
Default: null (equivalent to `earliest`).
|
||||
enableDlq::
|
||||
When set to true, it enables DLQ behavior for the consumer.
|
||||
By default, messages that result in errors are forwarded to a topic named `error.<destination>.<group>`.
|
||||
The DLQ topic name can be configurable by setting the `dlqName` property.
|
||||
The DLQ topic name can be configurable by setting the `dlqName` property or by defining a `@Bean` of type `DlqDestinationResolver`.
|
||||
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
|
||||
See <<kafka-dlq-processing>> processing for more information.
|
||||
Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message`, and `x-exception-stacktrace` as `byte[]`.
|
||||
@@ -331,11 +338,38 @@ To achieve exactly once consumption and production of records, the consumer and
|
||||
+
|
||||
Default: none.
|
||||
|
||||
[[reset-offsets]]
|
||||
==== Resetting Offsets
|
||||
|
||||
When an application starts, the initial position in each assigned partition depends on two properties `startOffset` and `resetOffsets`.
|
||||
If `resetOffsets` is `false`, normal Kafka consumer https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset[`auto.offset.reset`] semantics apply.
|
||||
i.e. If there is no committed offset for a partition for the binding's consumer group, the position is `earliest` or `latest`.
|
||||
By default, bindings with an explicit `group` use `earliest`, and anonymous bindings (with no `group`) use `latest`.
|
||||
These defaults can be overriden by setting the `startOffset` binding property.
|
||||
There will be no committed offset(s) the first time the binding is started with a particular `group`.
|
||||
The other condition where no committed offset exists is if the offset has been expired.
|
||||
With modern brokers (since 2.1), and default broker properties, the offsets are expired 7 days after the last member leaves the group.
|
||||
See the https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes[`offsets.retention.minutes`] broker property for more information.
|
||||
|
||||
When `resetOffsets` is `true`, the binder applies similar semantics to those that apply when there is no committed offset on the broker, as if this binding has never consumed from the topic; i.e. any current committed offset is ignored.
|
||||
|
||||
Following are two use cases when this might be used.
|
||||
|
||||
1. Consuming from a compacted topic containing key/value pairs.
|
||||
Set `resetOffsets` to `true` and `startOffset` to `earliest`; the binding will perform a `seekToBeginning` on all newly assigned partitions.
|
||||
|
||||
2. Consuming from a topic containing events, where you are only interested in events that occur while this binding is running.
|
||||
Set `resetOffsets` to `true` and `startOffset` to `latest`; the binding will perform a `seekToEnd` on akll newly assigned partitions.
|
||||
|
||||
IMPORTANT: If a rebalance occurs after the initial assignment, the seeks will only be performed on any newly assigned topics that were not assigned during the initial assignment.
|
||||
|
||||
For more control over topic offsets, see <<rebalance-listener>>; when a listener is provided, `resetOffsets: true` is ignored.
|
||||
|
||||
==== Consuming Batches
|
||||
|
||||
Starting with version 3.0, when `spring.cloud.stream.bindings.<name>.consumer.batch-mode` is set to `true`, all of the records received by polling the Kafka `Consumer` will be presented as a `List<?>` to the listener method.
|
||||
Starting with version 3.0, when `spring.cloud.stream.binding.<name>.consumer.batch-mode` is set to `true`, all of the records received by polling the Kafka `Consumer` will be presented as a `List<?>` to the listener method.
|
||||
Otherwise, the method will be called with one record at a time.
|
||||
The size of the batch is controlled by Kafka consumer properties `max.poll.records`, `min.fetch.bytes`, `fetch.max.wait.ms`; refer to the Kafka documentation for more information.
|
||||
The size of the batch is controlled by Kafka consumer properties `max.poll.records`, `fetch.min.bytes`, `fetch.max.wait.ms`; refer to the Kafka documentation for more information.
|
||||
|
||||
Bear in mind that batch mode is not supported with `@StreamListener` - it only works with the newer functional programming model.
|
||||
|
||||
@@ -386,6 +420,11 @@ messageKeyExpression::
|
||||
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message -- for example, `headers['myKey']`.
|
||||
With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of a `byte[]`.
|
||||
Now, the expression is evaluated before the payload is converted.
|
||||
In the case of a regular processor (`Function<String, String>` or `Function<Message<?>, Message<?>`), if the produced key needs to be same as the incoming key from the topic, this property can be set as below.
|
||||
`spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']`
|
||||
There is an important caveat to keep in mind for reactive functions.
|
||||
In that case, it is up to the application to manually copy the headers from the incoming messages to outbound messages.
|
||||
You can set the header, e.g. `myKey` and use `headers['myKey']` as suggested above or, for convenience, simply set the `KafkaHeaders.MESSAGE_KEY` header, and you do not need to set this property at all.
|
||||
+
|
||||
Default: `none`.
|
||||
headerPatterns::
|
||||
@@ -456,6 +495,13 @@ Timeout in number of seconds to wait for when closing the producer.
|
||||
+
|
||||
Default: `30`
|
||||
|
||||
allowNonTransactional::
|
||||
Normally, all output bindings associated with a transactional binder will publish in a new transaction, if one is not already in process.
|
||||
This property allows you to override that behavior.
|
||||
If set to true, records published to this output binding will not be run in a transaction, unless one is already in process.
|
||||
+
|
||||
Default: `false`
|
||||
|
||||
==== Usage examples
|
||||
|
||||
In this section, we show the use of the preceding properties for specific scenarios.
|
||||
@@ -674,7 +720,7 @@ IMPORTANT: If you deploy multiple instances of your application, each instance n
|
||||
=== Error Channels
|
||||
|
||||
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
|
||||
See <<spring-cloud-stream-overview-error-handling>> for more information.
|
||||
See https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling[this section on error handling] for more information.
|
||||
|
||||
The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureException` with properties:
|
||||
|
||||
@@ -690,9 +736,25 @@ You can consume these exceptions with your own Spring Integration flow.
|
||||
Kafka binder module exposes the following metrics:
|
||||
|
||||
`spring.cloud.stream.binder.kafka.offset`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
|
||||
The metrics provided are based on the Mircometer metrics library. The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
|
||||
The metrics provided are based on the Micrometer library.
|
||||
The binder creates the `KafkaBinderMetrics` bean if Micrometer is on the classpath and no other such beans provided by the application.
|
||||
The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
|
||||
This metric is particularly useful for providing auto-scaling feedback to a PaaS platform.
|
||||
|
||||
You can exclude `KafkaBinderMetrics` from creating the necessary infrastructure like consumers and then reporting the metrics by providing the following component in the application.
|
||||
|
||||
```
|
||||
@Component
|
||||
class NoOpBindingMeters {
|
||||
NoOpBindingMeters(MeterRegistry registry) {
|
||||
registry.config().meterFilter(
|
||||
MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
More details on how to suppress meters selectively can be found https://micrometer.io/docs/concepts#_meter_filters[here].
|
||||
|
||||
[[kafka-tombstones]]
|
||||
=== Tombstone Records (null record values)
|
||||
|
||||
@@ -775,6 +837,24 @@ Both of these interfaces provide a way to configure the config map used for cons
|
||||
For example, if you want to gain access to a bean that is defined at the application level, you can inject that in the implementation of the `configure` method.
|
||||
When the binder discovers that these customizers are available as beans, it will invoke the `configure` method right before creating the consumer and producer factories.
|
||||
|
||||
Both of these interfaces also provide access to both the binding and destination names so that they can be accessed while customizing producer and consumer properties.
|
||||
|
||||
[[admin-client-config-customization]]
|
||||
=== Customizing AdminClient Configuration
|
||||
|
||||
As with consumer and producer config customization above, applications can also customize the configuration for admin clients by providing an `AdminClientConfigCustomizer`.
|
||||
AdminClientConfigCustomizer's configure method provides access to the admin client properties, using which you can define further customization.
|
||||
Binder's Kafka topic provisioner gives the highest precedence for the properties given through this customizer.
|
||||
Here is an example of providing this customizer bean.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
|
||||
return props -> {
|
||||
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
= Appendices
|
||||
[appendix]
|
||||
|
||||
@@ -2,33 +2,35 @@
|
||||
|Name | Default | Description
|
||||
|
||||
|spring.cloud.stream.binders | | Additional per-binder properties (see {@link BinderProperties}) if more then one binder of the same type is used (i.e., connect to multiple instances of RabbitMq). Here you can specify multiple binder configurations, each with different environment settings. For example; spring.cloud.stream.binders.rabbit1.environment. . . , spring.cloud.stream.binders.rabbit2.environment. . .
|
||||
|spring.cloud.stream.binding-retry-interval | 30 | Retry interval (in seconds) used to schedule binding attempts. Default: 30 sec.
|
||||
|spring.cloud.stream.binding-retry-interval | `30` | Retry interval (in seconds) used to schedule binding attempts. Default: 30 sec.
|
||||
|spring.cloud.stream.bindings | | Additional binding properties (see {@link BinderProperties}) per binding name (e.g., 'input`). For example; This sets the content-type for the 'input' binding of a Sink application: 'spring.cloud.stream.bindings.input.contentType=text/plain'
|
||||
|spring.cloud.stream.default-binder | | The name of the binder to use by all bindings in the event multiple binders available (e.g., 'rabbit').
|
||||
|spring.cloud.stream.dynamic-destination-cache-size | 10 | The maximum size of Least Recently Used (LRU) cache of dynamic destinations. Once this size is reached, new destinations will trigger the removal of old destinations. Default: 10
|
||||
|spring.cloud.stream.dynamic-destinations | [] | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound.
|
||||
|spring.cloud.stream.function.batch-mode | false |
|
||||
|spring.cloud.stream.dynamic-destination-cache-size | `10` | The maximum size of Least Recently Used (LRU) cache of dynamic destinations. Once this size is reached, new destinations will trigger the removal of old destinations. Default: 10
|
||||
|spring.cloud.stream.dynamic-destinations | `[]` | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound.
|
||||
|spring.cloud.stream.function.batch-mode | `false` |
|
||||
|spring.cloud.stream.function.bindings | |
|
||||
|spring.cloud.stream.function.definition | | Definition of functions to bind. If several functions need to be composed into one, use pipes (e.g., 'fooFunc\|barFunc')
|
||||
|spring.cloud.stream.instance-count | 1 | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index | 0 | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-count | `1` | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index | `0` | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index-list | | A list of instance id's from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is the name of the binding. This setting will override the one set in 'spring.cloud.stream.instance-index'
|
||||
|spring.cloud.stream.integration.message-handler-not-propagated-headers | | Message header names that will NOT be copied from the inbound message.
|
||||
|spring.cloud.stream.kafka.binder.authorization-exception-retry-interval | | Time between retries after AuthorizationException is caught in the ListenerContainer; defalt is null which disables retries. For more info see: {@link org.springframework.kafka.listener.ConsumerProperties#setAuthorizationExceptionRetryInterval(java.time.Duration)}
|
||||
|spring.cloud.stream.kafka.binder.auto-add-partitions | false |
|
||||
|spring.cloud.stream.kafka.binder.auto-create-topics | true |
|
||||
|spring.cloud.stream.kafka.binder.brokers | [localhost] |
|
||||
|spring.cloud.stream.kafka.binder.auto-add-partitions | `false` |
|
||||
|spring.cloud.stream.kafka.binder.auto-alter-topics | `false` |
|
||||
|spring.cloud.stream.kafka.binder.auto-create-topics | `true` |
|
||||
|spring.cloud.stream.kafka.binder.brokers | `[localhost]` |
|
||||
|spring.cloud.stream.kafka.binder.certificate-store-directory | | When a certificate store location is given as classpath URL (classpath:), then the binder moves the resource from the classpath location inside the JAR to a location on the filesystem. If this value is set, then this location is used, otherwise, the certificate file is copied to the directory returned by java.io.tmpdir.
|
||||
|spring.cloud.stream.kafka.binder.configuration | | Arbitrary kafka properties that apply to both producers and consumers.
|
||||
|spring.cloud.stream.kafka.binder.consider-down-when-any-partition-has-no-leader | false |
|
||||
|spring.cloud.stream.kafka.binder.consider-down-when-any-partition-has-no-leader | `false` |
|
||||
|spring.cloud.stream.kafka.binder.consumer-properties | | Arbitrary kafka consumer properties.
|
||||
|spring.cloud.stream.kafka.binder.header-mapper-bean-name | | The bean name of a custom header mapper to use instead of a {@link org.springframework.kafka.support.DefaultKafkaHeaderMapper}.
|
||||
|spring.cloud.stream.kafka.binder.headers | [] |
|
||||
|spring.cloud.stream.kafka.binder.health-timeout | 60 | Time to wait to get partition information in seconds; default 60.
|
||||
|spring.cloud.stream.kafka.binder.headers | `[]` |
|
||||
|spring.cloud.stream.kafka.binder.health-timeout | `60` | Time to wait to get partition information in seconds; default 60.
|
||||
|spring.cloud.stream.kafka.binder.jaas | |
|
||||
|spring.cloud.stream.kafka.binder.min-partition-count | 1 |
|
||||
|spring.cloud.stream.kafka.binder.min-partition-count | `1` |
|
||||
|spring.cloud.stream.kafka.binder.producer-properties | | Arbitrary kafka producer properties.
|
||||
|spring.cloud.stream.kafka.binder.replication-factor | -1 |
|
||||
|spring.cloud.stream.kafka.binder.required-acks | 1 |
|
||||
|spring.cloud.stream.kafka.binder.replication-factor | `-1` |
|
||||
|spring.cloud.stream.kafka.binder.required-acks | `1` |
|
||||
|spring.cloud.stream.kafka.binder.transaction.producer.batch-timeout | |
|
||||
|spring.cloud.stream.kafka.binder.transaction.producer.buffer-size | |
|
||||
|spring.cloud.stream.kafka.binder.transaction.producer.compression-type | |
|
||||
@@ -52,14 +54,15 @@
|
||||
|spring.cloud.stream.metrics.key | | The name of the metric being emitted. Should be an unique value per application. Defaults to: ${spring.application.name:${vcap.application.name:${spring.config.name:application}}}.
|
||||
|spring.cloud.stream.metrics.meter-filter | | Pattern to control the 'meters' one wants to capture. By default all 'meters' will be captured. For example, 'spring.integration.*' will only capture metric information for meters whose name starts with 'spring.integration'.
|
||||
|spring.cloud.stream.metrics.properties | | Application properties that should be added to the metrics payload For example: `spring.application**`.
|
||||
|spring.cloud.stream.metrics.schedule-interval | 60s | Interval expressed as Duration for scheduling metrics snapshots publishing. Defaults to 60 seconds
|
||||
|spring.cloud.stream.override-cloud-connectors | false | This property is only applicable when the cloud profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder detects a suitable bound service (for example, a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and uses it for creating connections (usually through Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (for example, relying on the spring.rabbitmq.* properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.
|
||||
|spring.cloud.stream.pollable-source | none | A semi-colon delimited list of binding names of pollable sources. Binding names follow the same naming convention as functions. For example, name '...pollable-source=foobar' will be accessible as 'foobar-iin-0'' binding
|
||||
|spring.cloud.stream.metrics.schedule-interval | `60s` | Interval expressed as Duration for scheduling metrics snapshots publishing. Defaults to 60 seconds
|
||||
|spring.cloud.stream.override-cloud-connectors | `false` | This property is only applicable when the cloud profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder detects a suitable bound service (for example, a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and uses it for creating connections (usually through Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (for example, relying on the spring.rabbitmq.* properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.
|
||||
|spring.cloud.stream.pollable-source | `none` | A semi-colon delimited list of binding names of pollable sources. Binding names follow the same naming convention as functions. For example, name '...pollable-source=foobar' will be accessible as 'foobar-iin-0'' binding
|
||||
|spring.cloud.stream.poller.cron | | Cron expression value for the Cron Trigger.
|
||||
|spring.cloud.stream.poller.fixed-delay | 1000 | Fixed delay for default poller.
|
||||
|spring.cloud.stream.poller.initial-delay | 0 | Initial delay for periodic triggers.
|
||||
|spring.cloud.stream.poller.max-messages-per-poll | 1 | Maximum messages per poll for the default poller.
|
||||
|spring.cloud.stream.sendto.destination | none | The name of the header used to determine the name of the output destination
|
||||
|spring.cloud.stream.poller.fixed-delay | `1000` | Fixed delay for default poller.
|
||||
|spring.cloud.stream.poller.initial-delay | `0` | Initial delay for periodic triggers.
|
||||
|spring.cloud.stream.poller.max-messages-per-poll | `1` | Maximum messages per poll for the default poller.
|
||||
|spring.cloud.stream.poller.time-unit | | The TimeUnit to apply to delay values.
|
||||
|spring.cloud.stream.sendto.destination | `none` | The name of the header used to determine the name of the output destination
|
||||
|spring.cloud.stream.source | | A colon delimited string representing the names of the sources based on which source bindings will be created. This is primarily to support cases where source binding may be required without providing a corresponding Supplier. (e.g., for cases where the actual source of data is outside of scope of spring-cloud-stream - HTTP -> Stream)
|
||||
|
||||
|===
|
||||
@@ -219,13 +219,14 @@ Default: not set.
|
||||
resetOffsets::
|
||||
Whether to reset offsets on the consumer to the value provided by startOffset.
|
||||
Must be false if a `KafkaRebalanceListener` is provided; see <<rebalance-listener>>.
|
||||
See <<reset-offsets>> for more information about this property.
|
||||
+
|
||||
Default: `false`.
|
||||
startOffset::
|
||||
The starting offset for new groups.
|
||||
Allowed values: `earliest` and `latest`.
|
||||
If the consumer group is set explicitly for the consumer 'binding' (through `spring.cloud.stream.bindings.<channelName>.group`), 'startOffset' is set to `earliest`. Otherwise, it is set to `latest` for the `anonymous` consumer group.
|
||||
Also see `resetOffsets` (earlier in this list).
|
||||
See <<reset-offsets>> for more information about this property.
|
||||
+
|
||||
Default: null (equivalent to `earliest`).
|
||||
enableDlq::
|
||||
@@ -316,6 +317,33 @@ To achieve exactly once consumption and production of records, the consumer and
|
||||
+
|
||||
Default: none.
|
||||
|
||||
[[reset-offsets]]
|
||||
==== Resetting Offsets
|
||||
|
||||
When an application starts, the initial position in each assigned partition depends on two properties `startOffset` and `resetOffsets`.
|
||||
If `resetOffsets` is `false`, normal Kafka consumer https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset[`auto.offset.reset`] semantics apply.
|
||||
i.e. If there is no committed offset for a partition for the binding's consumer group, the position is `earliest` or `latest`.
|
||||
By default, bindings with an explicit `group` use `earliest`, and anonymous bindings (with no `group`) use `latest`.
|
||||
These defaults can be overriden by setting the `startOffset` binding property.
|
||||
There will be no committed offset(s) the first time the binding is started with a particular `group`.
|
||||
The other condition where no committed offset exists is if the offset has been expired.
|
||||
With modern brokers (since 2.1), and default broker properties, the offsets are expired 7 days after the last member leaves the group.
|
||||
See the https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes[`offsets.retention.minutes`] broker property for more information.
|
||||
|
||||
When `resetOffsets` is `true`, the binder applies similar semantics to those that apply when there is no committed offset on the broker, as if this binding has never consumed from the topic; i.e. any current committed offset is ignored.
|
||||
|
||||
Following are two use cases when this might be used.
|
||||
|
||||
1. Consuming from a compacted topic containing key/value pairs.
|
||||
Set `resetOffsets` to `true` and `startOffset` to `earliest`; the binding will perform a `seekToBeginning` on all newly assigned partitions.
|
||||
|
||||
2. Consuming from a topic containing events, where you are only interested in events that occur while this binding is running.
|
||||
Set `resetOffsets` to `true` and `startOffset` to `latest`; the binding will perform a `seekToEnd` on all newly assigned partitions.
|
||||
|
||||
IMPORTANT: If a rebalance occurs after the initial assignment, the seeks will only be performed on any newly assigned partitions that were not assigned during the initial assignment.
|
||||
|
||||
For more control over topic offsets, see <<rebalance-listener>>; when a listener is provided, `resetOffsets: true` is ignored.
|
||||
|
||||
==== Consuming Batches
|
||||
|
||||
Starting with version 3.0, when `spring.cloud.stream.binding.<name>.consumer.batch-mode` is set to `true`, all of the records received by polling the Kafka `Consumer` will be presented as a `List<?>` to the listener method.
|
||||
@@ -805,4 +833,4 @@ public AdminClientConfigCustomizer adminClientConfigCustomizer() {
|
||||
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
|
||||
};
|
||||
}
|
||||
```
|
||||
```
|
||||
|
||||
Reference in New Issue
Block a user