Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bd3eebd897 | ||
|
|
ed8683dcc2 | ||
|
|
60b6604988 | ||
|
|
a3e76282b4 | ||
|
|
c9687189b7 | ||
|
|
5fcdf28776 | ||
|
|
d334359cd4 | ||
|
|
aff0dc00ef | ||
|
|
7840decc86 |
10
README.adoc
10
README.adoc
@@ -14,18 +14,10 @@ Edit the files in the src/main/asciidoc/ directory instead.
|
||||
|
||||
image::https://circleci.com/gh/spring-cloud/spring-cloud-stream-binder-kafka.svg?style=svg["CircleCI", link="https://circleci.com/gh/spring-cloud/spring-cloud-stream-binder-kafka"]
|
||||
image::https://codecov.io/gh/spring-cloud/spring-cloud-stream-binder-kafka/branch/{github-tag}/graph/badge.svg["codecov", link="https://codecov.io/gh/spring-cloud/spring-cloud-stream-binder-kafka"]
|
||||
image::https://badges.gitter.im/spring-cloud/spring-cloud-stream.svg[Gitter, link="https://gitter.im/spring-cloud/spring-cloud-stream?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge"]
|
||||
image::https://badges.gitter.im/spring-cloud/spring-cloud-stream-binder-kafka.svg[Gitter, link="https://gitter.im/spring-cloud/spring-cloud-stream-binder-kafka?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge"]
|
||||
|
||||
// ======================================================================================
|
||||
|
||||
//= Overview
|
||||
[partintro]
|
||||
--
|
||||
This guide describes the Apache Kafka implementation of the Spring Cloud Stream Binder.
|
||||
It contains information about its design, usage, and configuration options, as well as information on how the Stream Cloud Stream concepts map onto Apache Kafka specific constructs.
|
||||
In addition, this guide explains the Kafka Streams binding capabilities of Spring Cloud Stream.
|
||||
--
|
||||
|
||||
== Apache Kafka Binder
|
||||
|
||||
=== Usage
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<version>3.2.0</version>
|
||||
</parent>
|
||||
<packaging>jar</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
|spring.cloud.stream.dynamic-destinations | `[]` | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound.
|
||||
|spring.cloud.stream.function.batch-mode | `false` |
|
||||
|spring.cloud.stream.function.bindings | |
|
||||
|spring.cloud.stream.function.definition | | Definition of functions to bind. If several functions need to be composed into one, use pipes (e.g., 'fooFunc\|barFunc')
|
||||
|spring.cloud.stream.instance-count | `1` | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index | `0` | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index-list | | A list of instance id's from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is the name of the binding. This setting will override the one set in 'spring.cloud.stream.instance-index'
|
||||
@@ -57,11 +56,6 @@
|
||||
|spring.cloud.stream.metrics.schedule-interval | `60s` | Interval expressed as Duration for scheduling metrics snapshots publishing. Defaults to 60 seconds
|
||||
|spring.cloud.stream.override-cloud-connectors | `false` | This property is only applicable when the cloud profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder detects a suitable bound service (for example, a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and uses it for creating connections (usually through Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (for example, relying on the spring.rabbitmq.* properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.
|
||||
|spring.cloud.stream.pollable-source | `none` | A semi-colon delimited list of binding names of pollable sources. Binding names follow the same naming convention as functions. For example, name '...pollable-source=foobar' will be accessible as 'foobar-iin-0'' binding
|
||||
|spring.cloud.stream.poller.cron | | Cron expression value for the Cron Trigger.
|
||||
|spring.cloud.stream.poller.fixed-delay | `1000` | Fixed delay for default poller.
|
||||
|spring.cloud.stream.poller.initial-delay | `0` | Initial delay for periodic triggers.
|
||||
|spring.cloud.stream.poller.max-messages-per-poll | `1` | Maximum messages per poll for the default poller.
|
||||
|spring.cloud.stream.poller.time-unit | | The TimeUnit to apply to delay values.
|
||||
|spring.cloud.stream.sendto.destination | `none` | The name of the header used to determine the name of the output destination
|
||||
|spring.cloud.stream.source | | A colon delimited string representing the names of the sources based on which source bindings will be created. This is primarily to support cases where source binding may be required without providing a corresponding Supplier. (e.g., for cases where the actual source of data is outside of scope of spring-cloud-stream - HTTP -> Stream)
|
||||
|
||||
|
||||
@@ -2121,6 +2121,12 @@ Arbitrary consumer properties at the binder level.
|
||||
producerProperties::
|
||||
Arbitrary producer properties at the binder level.
|
||||
|
||||
includeStoppedProcessorsForHealthCheck::
|
||||
When bindings for processors are stopped through actuator, then this processor will not participate in the health check by default.
|
||||
Set this property to `true` to enable health check for all processors including the ones that are currently stopped through bindings actuator endpoint.
|
||||
+
|
||||
Default: false
|
||||
|
||||
==== Kafka Streams Producer Properties
|
||||
|
||||
The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`
|
||||
|
||||
@@ -151,6 +151,9 @@ Default: `false`.
|
||||
|
||||
spring.cloud.stream.kafka.binder.certificateStoreDirectory::
|
||||
When the truststore or keystore certificate location is given as a classpath URL (`classpath:...`), the binder copies the resource from the classpath location inside the JAR file to a location on the filesystem.
|
||||
This is true for both broker level certificates (`ssl.truststore.location` and `ssl.keystore.location`) and certificates intended for schema registry (`schema.registry.ssl.truststore.location` and `schema.registry.ssl.keystore.location`).
|
||||
Keep in mind that the truststore and keystore classpath locations must be provided under `spring.cloud.stream.kafka.binder.configuration...`.
|
||||
For example, `spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location`, ``spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location`, etc.
|
||||
The file will be moved to the location specified as the value for this property which must be an existing directory on the filesystem that is writable by the process running the application.
|
||||
If this value is not set and the certificate file is a classpath resource, then it will be moved to System's temp directory as returned by `System.getProperty("java.io.tmpdir")`.
|
||||
This is also true, if this value is present, but the directory cannot be found on the filesystem or is not writable.
|
||||
|
||||
@@ -44,6 +44,8 @@ include::partitions.adoc[]
|
||||
|
||||
include::kafka-streams.adoc[]
|
||||
|
||||
include::tips.adoc[]
|
||||
|
||||
= Appendices
|
||||
[appendix]
|
||||
include::building.adoc[]
|
||||
|
||||
865
docs/src/main/asciidoc/tips.adoc
Normal file
865
docs/src/main/asciidoc/tips.adoc
Normal file
@@ -0,0 +1,865 @@
|
||||
== Tips, Tricks and Recipes
|
||||
|
||||
=== Simple DLQ with Kafka
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
As a developer, I want to write a consumer application that processes records from a Kafka topic.
|
||||
However, if some error occurs in processing, I don't want the application to stop completely.
|
||||
Instead, I want to send the record in error to a DLT (Dead-Letter-Topic) and then continue processing new records.
|
||||
|
||||
==== Solution
|
||||
|
||||
The solution for this problem is to use the DLQ feature in Spring Cloud Stream.
|
||||
For the purposes of this discussion, let us assume that the following is our processor function.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Consumer<byte[]> processData() {
|
||||
return s -> {
|
||||
throw new RuntimeException();
|
||||
};
|
||||
```
|
||||
|
||||
This is a very trivial function that throws an exception for all the records that it processes, but you can take this function and extend it to any other similar situations.
|
||||
|
||||
In order to send the records in error to a DLT, we need to provide the following configuration.
|
||||
|
||||
```
|
||||
spring.cloud.stream:
|
||||
bindings:
|
||||
processData-in-0:
|
||||
group: my-group
|
||||
destination: input-topic
|
||||
kafka:
|
||||
bindings:
|
||||
processData-in-0:
|
||||
consumer:
|
||||
enableDlq: true
|
||||
dlqName: input-topic-dlq
|
||||
```
|
||||
|
||||
In order to activate DLQ, the application must provide a group name.
|
||||
Anonymous consumers cannot use the DLQ facilities.
|
||||
We also need to enable DLQ by setting the `enableDLQ` property on the Kafka consumer binding to `true`.
|
||||
Finally, we can optionally provide the DLT name by providing the `dlqName` on Kafka consumer binding, which otherwise default to `input-topic-dlq.my-group.error` in this case.
|
||||
|
||||
Note that in the example consumer provided above, the type of the payload is `byte[]`.
|
||||
By default, the DLQ producer in Kafka binder expects the payload of type `byte[]`.
|
||||
If that is not the case, then we need to provide the configuration for proper serializer.
|
||||
For example, let us re-write the consumer function as below:
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Consumer<String> processData() {
|
||||
return s -> {
|
||||
throw new RuntimeException();
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
Now, we need to tell Spring Cloud Stream, how we want to serialize the data when writing to the DLT.
|
||||
Here is the modified configuration for this scenario:
|
||||
|
||||
```
|
||||
spring.cloud.stream:
|
||||
bindings:
|
||||
processData-in-0:
|
||||
group: my-group
|
||||
destination: input-topic
|
||||
kafka:
|
||||
bindings:
|
||||
processData-in-0:
|
||||
consumer:
|
||||
enableDlq: true
|
||||
dlqName: input-topic-dlq
|
||||
dlqProducerProperties:
|
||||
configuration:
|
||||
value.serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
|
||||
```
|
||||
|
||||
=== DLQ with Advanced Retry Options
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
This is similar to the recipe above, but as a developer I would like to configure the way retries are handled.
|
||||
|
||||
==== Solution
|
||||
|
||||
If you followed the above recipe, then you get the default retry options built into the Kafka binder when the processing encounters an error.
|
||||
|
||||
By default, the binder retires for a maximum of 3 attempts with a one second initial delay, 2.0 multiplier with each back off with a max delay of 10 seconds.
|
||||
You can change all these configurations as below:
|
||||
|
||||
```
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
|
||||
```
|
||||
|
||||
If you want, you can also provide a list of retryable exceptions by providing a map of boolean values.
|
||||
For example,
|
||||
|
||||
```
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
|
||||
```
|
||||
|
||||
By default, any exceptions not listed in the map above will be retried.
|
||||
If that is not desired, then you can disable that by providing,
|
||||
|
||||
```
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
|
||||
```
|
||||
|
||||
You can also provide your own `RetryTemplate` and mark it as `@StreamRetryTemplate` which will be scanned and used by the binder.
|
||||
This is useful when you want more sophisticated retry strategies and policies.
|
||||
|
||||
If you have multiple `@StreamRetryTemplate` beans, then you can specify which one your binding wants by using the property,
|
||||
|
||||
```
|
||||
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
|
||||
```
|
||||
|
||||
=== Handling Deserialization errors with DLQ
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
I have a processor that encounters a deserilzartion exception in Kafka consumer.
|
||||
I would expect that the Spring Cloud Stream DLQ mechanism will catch that scenario, but it does not.
|
||||
How can I handle this?
|
||||
|
||||
==== Solution
|
||||
|
||||
The normal DLQ mechanism offered by Spring Cloud Stream will not help when Kafka consumer throws an irrecoverable deserialization excepion.
|
||||
This is because, this exception happens even before the consumer's `poll()` method returns.
|
||||
Spring for Apache Kafka project offers some great ways to help the binder with this situation.
|
||||
Let us explore those.
|
||||
|
||||
Assuming this is our function:
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Consumer<String> functionName() {
|
||||
return s -> {
|
||||
System.out.println(s);
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
It is a trivial function that takes a `String` parameter.
|
||||
|
||||
We want to bypass the message converters provided by Spring Cloud Stream and want to use native deserializers instead.
|
||||
In the case of `String` types, it does not make much sense, but for more complex types like AVRO etc. you have to rely on external deserializers and therefore want to delegate the conversion to Kafka.
|
||||
|
||||
Now when the consumer receives the data, let us assume that there is a bad record that causes a deserilziation errror, maybe someone passed an `Integer` instead of a `String` for example.
|
||||
In that case, if you don't do something in the application, the excption will be propagated through the chain and your application will exit eventually.
|
||||
|
||||
In order to handle this, you can add a `ListenerContainerCustomizer` `@Bean` that configures a `SeekToCurrentErrorHandler`.
|
||||
This `SeekToCurrentErrorHandler` is configured with a `DeadLetterPublishingRecoverer`.
|
||||
We also need to configure an `ErrorHandlingDeserializer` for the consumer.
|
||||
That sounds like a lot of complex things, but in reality, it boils down to these 3 beans in this case.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
|
||||
return (container, dest, group) -> {
|
||||
container.setErrorHandler(errorHandler);
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
```
|
||||
@Bean
|
||||
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
|
||||
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
|
||||
}
|
||||
```
|
||||
|
||||
```
|
||||
@Bean
|
||||
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
|
||||
return new DeadLetterPublishingRecoverer(bytesTemplate);
|
||||
}
|
||||
```
|
||||
|
||||
Let us analyze each of them.
|
||||
The first one is the `ListenerContainerCustomizer` bean that takes a `SeekToCurrentErrorHandler`.
|
||||
The container is now customized with that particular error handler.
|
||||
You can learn more about container customization https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_advanced_consumer_configuration[here].
|
||||
|
||||
The second bean is the `SeekToCurrentErrorHandler` that is configured with a publishing to a `DLT`.
|
||||
See https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current[here] for more details on `SeekToCurrentErrorHandler`.
|
||||
|
||||
The third bean is the `DeadLetterPublishingRecoverer` that is ultimately responsible for sending to the `DLT`.
|
||||
By default, the `DLT` topic is named as the ORIGINAL_TOPIC_NAME.DLT.
|
||||
You can change that though.
|
||||
See the https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters[docs] for more details.
|
||||
|
||||
|
||||
We also need to configure an https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer[ErrorHandlingDeserializer] through application config.
|
||||
|
||||
The `ErrorHandlingDeserializer` delegates to the actual deserializer.
|
||||
In case of errors, it sets key/value of the record to be null and includes the raw bytes of the message.
|
||||
It then sets the exception in a header and passes this record to the listener, which then calls the registered error handler.
|
||||
|
||||
Following is the configuration required:
|
||||
|
||||
```
|
||||
spring.cloud.stream:
|
||||
function:
|
||||
definition: functionName
|
||||
bindings:
|
||||
functionName-in-0:
|
||||
group: group-name
|
||||
destination: input-topic
|
||||
consumer:
|
||||
use-native-decoding: true
|
||||
kafka:
|
||||
bindings:
|
||||
functionName-in-0:
|
||||
consumer:
|
||||
enableDlq: true
|
||||
dlqName: dlq-topic
|
||||
dlqProducerProperties:
|
||||
configuration:
|
||||
value.serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
configuration:
|
||||
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
|
||||
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
|
||||
```
|
||||
|
||||
We are providing the `ErrorHandlingDeserializer` through the `configuration` property on the binding.
|
||||
We are also indicating that the actual deserializer to delegate is the `StringDeserializer`.
|
||||
|
||||
Keep in mind that none of the dlq properties above are relevant for the discussions in this recipe.
|
||||
They are purely meant for addressing any application level errors only.
|
||||
|
||||
=== Basic offset management in Kafka binder
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
I want to write a Spring Cloud Stream Kafka consumer applicaiton and not sure about how it manages Kafka consumer offsets.
|
||||
Can you exaplain?
|
||||
|
||||
==== Solution
|
||||
|
||||
We encourage you read the https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#reset-offsets[docs] section on this to get a thorough understanding on it.
|
||||
|
||||
Here is it in a gist:
|
||||
|
||||
Kafka supports two types of offsets to start with by default - `earliest` and `latest`.
|
||||
Their semantics are self-explanatory from their names.
|
||||
|
||||
Assuming you are running the consumer for the first time.
|
||||
If you miss the group.id in your Spring Cloud Stream application, then it becomes an anonymous consumer.
|
||||
Whenever, you have an anonymous consumer, in that case, Spring Cloud Stream application by default will start from the `latest` available offset in the topic partition.
|
||||
On the other hand, if you explicitly specify a group.id, then by default, the Spring Cloud Stream application will start from the `earliest` available offset in the topic partiton.
|
||||
|
||||
In both cases above (consumers with explicit groups and anonymous groups), the starting offset can be switched around by using the property `spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset` and setting it to either `earliest` or `latest`.
|
||||
|
||||
Now, assume that you already ran the consumer before and now starting it again.
|
||||
In this case, the starting offset semantics in the above case do not apply as the consumer finds an already committed offset for the consumer group (In the case of an anonymous consumer, although the application does not provide a group.id, the binder will auto generate one for you).
|
||||
It simply picks up from the last committed offset onward.
|
||||
This is true, even when you have a `startOffset` value provided.
|
||||
|
||||
However, you can override the default behavior where the consumer starts from the last committed offset by using the `resetOffsets` property.
|
||||
In order to do that, set the property `spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets` to `true` (which is `false` by default).
|
||||
Then make sure you provide the `startOffset` value (either `earliest` or `latest`).
|
||||
When you do that and then start the consumer application, each time you start, it starts as if this is starting for the first time and ignore any committed offsets for the partition.
|
||||
|
||||
=== Seeking to arbitrary offsets in Kafka
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
Using Kafka binder, I know that it can set the offset to either `earliest` or `latest`, but I have a requirement to seek the offset to something in the middle, an arbitrary offset.
|
||||
Is there a way to achieve this using Spring Cloud Stream Kafka biner?
|
||||
|
||||
==== Solution
|
||||
|
||||
Previously we saw how Kafka binder allows you to tackle basic offset management.
|
||||
By default, the binder does not allow you to rewind to an arbitrary offset, at least through the mechanism we saw in that reipce.
|
||||
However, there are some low-level strategies that the binder provides to achieve this use case.
|
||||
Let's explore them.
|
||||
|
||||
First of all, when you want to reset to an arbitrary offset other than `earliest` or `latest`, make sure to leave the `resetOffsets` configuration to its defaults, which is `false`.
|
||||
Then you have to provide a custom bean of type `KafkaBindingRebalanceListener`, which will be injected into all consumer bindings.
|
||||
It is an interface that comes with a few default methods, but here is the method that we are interested in:
|
||||
|
||||
```
|
||||
/**
|
||||
* Invoked when partitions are initially assigned or after a rebalance. Applications
|
||||
* might only want to perform seek operations on an initial assignment. While the
|
||||
* 'initial' argument is true for each thread (when concurrency is greater than 1),
|
||||
* implementations should keep track of exactly which partitions have been sought.
|
||||
* There is a race in that a rebalance could occur during startup and so a topic/
|
||||
* partition that has been sought on one thread may be re-assigned to another
|
||||
* thread and you may not wish to re-seek it at that time.
|
||||
* @param bindingName the name of the binding.
|
||||
* @param consumer the consumer.
|
||||
* @param partitions the partitions.
|
||||
* @param initial true if this is the initial assignment on the current thread.
|
||||
*/
|
||||
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> partitions, boolean initial) {
|
||||
// do nothing
|
||||
}
|
||||
```
|
||||
|
||||
Let us look at the details.
|
||||
|
||||
In essence, this method will be invoked each time during the initial assignment for a topic partition or after a rebalance.
|
||||
For better illustration, let us assume that our topic is `foo` and it has 4 partitions.
|
||||
Initially, we are only starting a single consumer in the group and this consumer will consume from all partitions.
|
||||
When the consumer starts for the first time, all 4 partitions are getting initially assigned.
|
||||
However, we do not want to start the partitions to consume at the defaults (`earliest` since we define a group), rather for each partition, we want them to consume after seeking to arbitrary offsets.
|
||||
Imagine that you have a business case to consume from certain offsets as below.
|
||||
|
||||
```
|
||||
Partition start offset
|
||||
|
||||
0 1000
|
||||
1 2000
|
||||
2 2000
|
||||
3 1000
|
||||
```
|
||||
|
||||
This could be achieved by implementing the above method as below.
|
||||
|
||||
```
|
||||
|
||||
@Override
|
||||
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
|
||||
|
||||
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
|
||||
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
|
||||
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
|
||||
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
|
||||
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
|
||||
|
||||
if (initial) {
|
||||
partitions.forEach(tp -> {
|
||||
if (topicPartitionOffset.containsKey(tp)) {
|
||||
final Long offset = topicPartitionOffset.get(tp);
|
||||
try {
|
||||
consumer.seek(tp, offset);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Handle excpetions carefully.
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
This is just a rudimentary implementation.
|
||||
Real world use cases are much more complex than this and you need to adjust accordingly, but this certainly gives you a basic sketch.
|
||||
When consumer `seek` fails, it may throw some runtime exceptions and you need to decide what to do in those cases.
|
||||
|
||||
==== What if we start a second consumer with the same group id?
|
||||
|
||||
When we add a second consumer, a rebalance will occur and some partitions will be moved around.
|
||||
Let's say that the new consumer gets partitions `2` and `3`.
|
||||
When this new Spring Cloud Stream consumer calls this `onPartitionsAssigned` method, it will see that this is the initial assignment for partititon `2` and `3` on this consumer.
|
||||
Therefore, it will do the seek operation becuase of the conditional check on the `initial` argument.
|
||||
In the case of the first consumer, it now only has partitons `0` and `1`
|
||||
However, for this consumer it was simply a rebalance event and not considered as an intial assignment.
|
||||
Thus, it will not re-seek to the given offsets because of the conditional check on the `initial` argument.
|
||||
|
||||
=== How do I manually acknowledge using Kafka binder?
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
Using Kafka binder, I want to manually acknowledge messages in my consumer.
|
||||
How do I do that?
|
||||
|
||||
==== Solution
|
||||
|
||||
By default, Kafka binder delegates to the default commit settings in Spring for Apache Kafka project.
|
||||
The default `ackMode` in Spring Kafka is `batch`.
|
||||
See https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets[here] for more details on that.
|
||||
|
||||
There are situations in which you want to disable this default commit behavior and rely on manual commits.
|
||||
Following steps allow you to do that.
|
||||
|
||||
Set the property `spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode` to either `MANUAL` or `MANUAL_IMMEDIATE`.
|
||||
When it is set like that, then there will be a header called `kafka_acknowledgment` (from `KafkaHeaders.ACKNOWLEDGMENT`) present in the message received by the consumer method.
|
||||
|
||||
For example, imagine this as your consumer method.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Consumer<Message<String>> myConsumer() {
|
||||
return msg -> {
|
||||
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
|
||||
if (acknowledgment != null) {
|
||||
System.out.println("Acknowledgment provided");
|
||||
acknowledgment.acknowledge();
|
||||
}
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
Then you set the property `spring.cloud.stream.bindings.myConsumer-in-0.consumer.ackMode` to `MANUAL` or `MANUAL_IMMEDIATE`.
|
||||
|
||||
=== How do I override the default binding names in Spring Cloud Stream?
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
Spring Cloud Stream creates default bindings based on the function definition and signature, but how do I override these to more domain friendly names?
|
||||
|
||||
==== Solution
|
||||
|
||||
Assume that following is your function signature.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Function<String, String> uppercase(){
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
By default, Spring Cloud Stream will create the bindings as below.
|
||||
|
||||
1. uppercase-in-0
|
||||
2. uppercase-out-0
|
||||
|
||||
You can override these bindings to something by using the following properties.
|
||||
|
||||
```
|
||||
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
|
||||
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
|
||||
```
|
||||
|
||||
After this, all binding properties must be made on the new names, `my-transformer-in` and `my-transformer-out`.
|
||||
|
||||
Here is another example with Kafka Streams and multiple inputs.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
By default, Spring Cloud Stream will create three different binding names for this function.
|
||||
|
||||
1. processOrder-in-0
|
||||
2. processOrder-in-1
|
||||
3. processOrder-out-0
|
||||
|
||||
You have to use these binding names each time you want to set some configuration on these bindings.
|
||||
You don't like that, and you want to use more domain-friendly and readable binding names, for example, something like.
|
||||
|
||||
1. orders
|
||||
2. accounts
|
||||
3. enrichedOrders
|
||||
|
||||
You can easily do that by simply setting these three properties
|
||||
|
||||
1. spring.cloud.stream.function.bindings.processOrder-in-0=orders
|
||||
2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts
|
||||
3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
|
||||
|
||||
Once you do that, it overrides the default binding names and any properties that you want to set on them must be on these new binding names.
|
||||
|
||||
=== How do I send a message key as part of my record?
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
I need to send a key along with the payload of the record, is there a way to do that in Spring Cloud Stream?
|
||||
|
||||
==== Solution
|
||||
|
||||
It is often necessary that you want to send associative data structure like a map as the record with a key and value.
|
||||
Spring Cloud Stream allows you to do that in a straightforward manner.
|
||||
Following is a basic blueprint for doing this, but you may want to adapt it to your paricular use case.
|
||||
|
||||
Here is sample producer method (aka `Supplier`).
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Supplier<Message<String>> supplier() {
|
||||
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
|
||||
}
|
||||
```
|
||||
|
||||
This is a trivial function that sends a message with a `String` payload, but also with a key.
|
||||
Note that we set the key as a message header using `KafkaHeaders.MESSAGE_KEY`.
|
||||
|
||||
If you want to change the key from the default `kafka_messageKey`, then in the configuration, we need to specify this property:
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
|
||||
```
|
||||
|
||||
Please note that we use the binding name `supplier-out-0` since that is our function name, please update accordingly.
|
||||
|
||||
Then, we use this new key when we produce the message.
|
||||
|
||||
=== How do I use native serializer and deserializer instead of message conversion done by Spring Cloud Stream?
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
Instead of using the message converters in Spring Cloud Stream, I want to use native Serializer and Deserializer in Kafka.
|
||||
By default, Spring Cloud Stream takes care of this conversion using its internal built-in message converters.
|
||||
How can I bypass this and delegate the responsibility to Kafka?
|
||||
|
||||
==== Solution
|
||||
|
||||
This is really easy to do.
|
||||
|
||||
All you have to do is to provide the following property to enable native serialization.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
|
||||
```
|
||||
|
||||
Then, you need to also set the serailzers.
|
||||
There are a couple of ways to do this.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.key.serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.value.serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
```
|
||||
|
||||
or using the binder configuration.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.binder.configurarion.key.serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
spring.cloud.stream.kafka.binder.configurarion.value.serializer: org.apache.kafka.common.serialization.StringSerializer
|
||||
```
|
||||
|
||||
When using the binder way, it is applied against all the bindings whereas setting them at the bindings are per binding.
|
||||
|
||||
On the deserializing side, you just need to provide the deserializers as configuration.
|
||||
|
||||
For example,
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configurarion.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
```
|
||||
|
||||
You can also set them at the binder level.
|
||||
|
||||
There is an optional property that you can set to force native decoding.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
|
||||
```
|
||||
|
||||
However, in the case of Kafka binder, this is unncessary, as by the time it reaches the binder, Kafka already deserializes them using the configured deserializers.
|
||||
|
||||
=== Explain how offset resetting work in Kafka Streams binder
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
By default, Kafka Streams binder always starts from the earliest offset for a new consumer.
|
||||
Sometimes, it is beneficial or required by the application to start from the latest offset.
|
||||
Kafka Streams binder allows you to do that.
|
||||
|
||||
==== Solution
|
||||
|
||||
Before we look at the solution, let us look at the following scenario.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
|
||||
(s, t) -> s.join(t, ...)
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
We have a `BiConsumer` bean that requires two input bindings.
|
||||
In this case, the first binding is for a `KStream` and the second one is for a `KTable`.
|
||||
When running this application for the first time, by default, both bindings start from the `earliest` offset.
|
||||
What about I want to start from the `latest` offset due to some requirements?
|
||||
You can do this by enabling the following properties.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
|
||||
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
|
||||
```
|
||||
|
||||
If you want only one binding to start from the `latest` offset and the other to consumer from the default `earliest`, then leave the latter binding out from the configuration.
|
||||
|
||||
Keep in mind that, once there are committed offsets, these setting are *not* honored and the committed offsets take precedence.
|
||||
|
||||
=== Keeping track of successful sending of records (producing) in Kafka
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
I have a Kafka producer application and I want to keep track of all my successful sedings.
|
||||
|
||||
==== Solution
|
||||
|
||||
Let us assume that we have this following supplier in the application.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Supplier<Message<String>> supplier() {
|
||||
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
|
||||
}
|
||||
```
|
||||
|
||||
Then, we need to define a new `MessageChannel` bean to capture all the successful send information.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public MessageChannel fooRecordChannel() {
|
||||
return new DirectChannel();
|
||||
}
|
||||
```
|
||||
|
||||
Next, define this property in the application configuration to provide the bean name for the `recordMetadataChannel`.
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
|
||||
```
|
||||
|
||||
At this point, successful sent information will be sent to the `fooRecordChannel`.
|
||||
|
||||
You can write an `IntegrationFlow` as below to see the information.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public IntegrationFlow integrationFlow() {
|
||||
return f -> f.channel("fooRecordChannel")
|
||||
.handle((payload, messageHeaders) -> payload);
|
||||
}
|
||||
```
|
||||
|
||||
In the `handle` method, the payload is what got sent to Kafka and the message headers contain a special key called `kafka_recordMetadata`.
|
||||
Its value is a `RecordMetadata` that contains information about topic partition, current offset etc.
|
||||
|
||||
=== Adding custom header mapper in Kafka
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
I have a Kafka producer application that sets some headers, but they are missing in the consumer application. Why is that?
|
||||
|
||||
==== Solution
|
||||
|
||||
Under normal circumstances, this should be fine.
|
||||
|
||||
Imagine, you have the following producer.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Supplier<Message<String>> supply() {
|
||||
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
|
||||
}
|
||||
```
|
||||
|
||||
On the consumer side, you should still see the header "foo", and the following should not give you any issues.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Consumer<Message<String>> consume() {
|
||||
return s -> {
|
||||
final String foo = (String)s.getHeaders().get("foo");
|
||||
System.out.println(foo);
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
If you provide a https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties[custom header mapper] in the application, then this won't work.
|
||||
Let's say you have an empty `KafkaHeaderMapper` in the application.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
|
||||
return new KafkaHeaderMapper() {
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> target) {
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
If that is your implementation, then you will miss the `foo` header on the consumer.
|
||||
Chances are that, you may have some logic inside those `KafkaHeaderMapper` methods.
|
||||
You need the following to populate the `foo` header.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
|
||||
return new KafkaHeaderMapper() {
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
final String foo = (String) headers.get("foo");
|
||||
target.add("foo", foo.getBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> target) {
|
||||
final Header foo = source.lastHeader("foo");
|
||||
target.put("foo", new String(foo.value()));
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
That will properly populate the `foo` header from the producer to consumer.
|
||||
|
||||
==== Special note on the id header
|
||||
|
||||
In Spring Cloud Stream, the `id` header is a special header, but some applications may want to have special custom id headers - something like `custom-id` or `ID` or `Id`.
|
||||
The first one (`custom-id`) will propagate without any custom header mapper from producer to consumer.
|
||||
However, if you produce with a variant of the framework reserved `id` header - such as `ID`, `Id`, `iD` etc. then you will run into issues with the internals of the framework.
|
||||
See this https://stackoverflow.com/questions/68412600/change-the-behaviour-in-spring-cloud-stream-make-header-matcher-case-sensitive[StackOverflow thread] fore more context on this use case.
|
||||
In that case, you must use a custom `KafkaHeaderMapper` to map the case-sensitive id header.
|
||||
For example, let's say you have the following producer.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Supplier<Message<String>> supply() {
|
||||
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
|
||||
}
|
||||
```
|
||||
|
||||
The header `Id` above will be gone from the consuming side as it clashes with the framework `id` header.
|
||||
You can provide a custom `KafkaHeaderMapper` to solve this issue.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
|
||||
return new KafkaHeaderMapper() {
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
final String myId = (String) headers.get("Id");
|
||||
target.add("Id", myId.getBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> target) {
|
||||
final Header Id = source.lastHeader("Id");
|
||||
target.put("Id", new String(Id.value()));
|
||||
}
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
By doing this, both `id` and `Id` headers will be available from the producer to the consumer side.
|
||||
|
||||
=== Producing to multiple topics in transaction
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
How do I produce transactional messages to multiple Kafka topics?
|
||||
|
||||
For more context, see this https://stackoverflow.com/questions/68928091/dlq-bounded-retry-and-eos-when-producing-to-multiple-topics-using-spring-cloud[StackOverflow question].
|
||||
|
||||
==== Solution
|
||||
|
||||
Use transactional support in Kafka binder for transactions and then provide an `AfterRollbackProcessor`.
|
||||
In order to produce to multiple topics, use `StreamBridge` API.
|
||||
|
||||
Below are the code snippets for this:
|
||||
|
||||
```
|
||||
@Autowired
|
||||
StreamBridge bridge;
|
||||
|
||||
@Bean
|
||||
Consumer<String> input() {
|
||||
return str -> {
|
||||
System.out.println(str);
|
||||
this.bridge.send("left", str.toUpperCase());
|
||||
this.bridge.send("right", str.toLowerCase());
|
||||
if (str.equals("Fail")) {
|
||||
throw new RuntimeException("test");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
|
||||
return (container, dest, group) -> {
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
|
||||
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
|
||||
container.setAfterRollbackProcessor(rollbackProcessor);
|
||||
};
|
||||
}
|
||||
|
||||
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
|
||||
return new DefaultAfterRollbackProcessor<>(
|
||||
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
==== Required Configuration
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
|
||||
spring.cloud.stream.kafka.binder.required-acks=all
|
||||
spring.cloud.stream.bindings.input-in-0.group=foo
|
||||
spring.cloud.stream.bindings.input-in-0.destination=input
|
||||
spring.cloud.stream.bindings.left.destination=left
|
||||
spring.cloud.stream.bindings.right.destination=right
|
||||
|
||||
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
|
||||
```
|
||||
|
||||
in order to test, you can use the following:
|
||||
|
||||
```
|
||||
@Bean
|
||||
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
|
||||
return args -> {
|
||||
System.in.read();
|
||||
template.send("input", "Fail".getBytes());
|
||||
template.send("input", "Good".getBytes());
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
Some important notes:
|
||||
|
||||
Please ensure that you don't have any DLQ settings on the application configuration as we manually configure DLT (By default it will be published to a topic named `input.DLT` based on the initial consumer function).
|
||||
Also, reset the `maxAttempts` on consumer binding to `1` in order to avoid retries by the binder.
|
||||
It will be max tried a total of 3 in the example above (initial try + the 2 attempts in the `FixedBackoff`).
|
||||
|
||||
See the https://stackoverflow.com/questions/68928091/dlq-bounded-retry-and-eos-when-producing-to-multiple-topics-using-spring-cloud[StackOverflow thread] for more details on how to test this code.
|
||||
If you are using Spring Cloud Stream to test it by adding more consumer functions, make sure to set the `isolation-level` on the consumer binding to `read-committed`.
|
||||
|
||||
This https://stackoverflow.com/questions/68941306/spring-cloud-stream-database-transaction-does-not-roll-back[StackOverflow thread] is also related to this discussion.
|
||||
|
||||
=== Pitfalls to avoid when running multiple pollable consumers
|
||||
|
||||
==== Problem Statement
|
||||
|
||||
How can I run multiple instances of the pollable consumers and generate unique `client.id` for each instance?
|
||||
|
||||
==== Solution
|
||||
|
||||
Assuming that I have the following definition:
|
||||
|
||||
```
|
||||
spring.cloud.stream.pollable-source: foo
|
||||
spring.cloud.stream.bindings.foo-in-0.group: my-group
|
||||
```
|
||||
|
||||
When running the application, the Kafka consumer generates a client.id (something like `consumer-my-group-1`).
|
||||
For each instance of the application that is running, this `client.id` will be the same, causing unexpected issues.
|
||||
|
||||
In order to fix this, you can add the following property on each instance of the application:
|
||||
|
||||
```
|
||||
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
|
||||
```
|
||||
|
||||
See this https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1139[GitHub issue] for more details.
|
||||
|
||||
8
pom.xml
8
pom.xml
@@ -2,12 +2,12 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<version>3.2.0</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>3.1.0-SNAPSHOT</version>
|
||||
<version>3.1.0</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<scm>
|
||||
@@ -21,10 +21,10 @@
|
||||
</scm>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-kafka.version>2.8.0-RC1</spring-kafka.version>
|
||||
<spring-kafka.version>2.8.0</spring-kafka.version>
|
||||
<spring-integration-kafka.version>5.5.5</spring-integration-kafka.version>
|
||||
<kafka.version>3.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>3.2.0-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-stream.version>3.2.0</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<version>3.2.0</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<version>3.2.0</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015-2018 the original author or authors.
|
||||
* Copyright 2015-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -163,24 +163,44 @@ public class KafkaBinderConfigurationProperties {
|
||||
|
||||
private void moveCertsToFileSystemIfNecessary() {
|
||||
try {
|
||||
final String trustStoreLocation = this.configuration.get("ssl.truststore.location");
|
||||
if (trustStoreLocation != null && trustStoreLocation.startsWith("classpath:")) {
|
||||
final String fileSystemLocation = moveCertToFileSystem(trustStoreLocation, this.certificateStoreDirectory);
|
||||
// Overriding the value with absolute filesystem path.
|
||||
this.configuration.put("ssl.truststore.location", fileSystemLocation);
|
||||
}
|
||||
final String keyStoreLocation = this.configuration.get("ssl.keystore.location");
|
||||
if (keyStoreLocation != null && keyStoreLocation.startsWith("classpath:")) {
|
||||
final String fileSystemLocation = moveCertToFileSystem(keyStoreLocation, this.certificateStoreDirectory);
|
||||
// Overriding the value with absolute filesystem path.
|
||||
this.configuration.put("ssl.keystore.location", fileSystemLocation);
|
||||
}
|
||||
moveBrokerCertsIfApplicable();
|
||||
moveSchemaRegistryCertsIfApplicable();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void moveBrokerCertsIfApplicable() throws IOException {
|
||||
final String trustStoreLocation = this.configuration.get("ssl.truststore.location");
|
||||
if (trustStoreLocation != null && trustStoreLocation.startsWith("classpath:")) {
|
||||
final String fileSystemLocation = moveCertToFileSystem(trustStoreLocation, this.certificateStoreDirectory);
|
||||
// Overriding the value with absolute filesystem path.
|
||||
this.configuration.put("ssl.truststore.location", fileSystemLocation);
|
||||
}
|
||||
final String keyStoreLocation = this.configuration.get("ssl.keystore.location");
|
||||
if (keyStoreLocation != null && keyStoreLocation.startsWith("classpath:")) {
|
||||
final String fileSystemLocation = moveCertToFileSystem(keyStoreLocation, this.certificateStoreDirectory);
|
||||
// Overriding the value with absolute filesystem path.
|
||||
this.configuration.put("ssl.keystore.location", fileSystemLocation);
|
||||
}
|
||||
}
|
||||
|
||||
private void moveSchemaRegistryCertsIfApplicable() throws IOException {
|
||||
String trustStoreLocation = this.configuration.get("schema.registry.ssl.truststore.location");
|
||||
if (trustStoreLocation != null && trustStoreLocation.startsWith("classpath:")) {
|
||||
final String fileSystemLocation = moveCertToFileSystem(trustStoreLocation, this.certificateStoreDirectory);
|
||||
// Overriding the value with absolute filesystem path.
|
||||
this.configuration.put("schema.registry.ssl.truststore.location", fileSystemLocation);
|
||||
}
|
||||
final String keyStoreLocation = this.configuration.get("schema.registry.ssl.keystore.location");
|
||||
if (keyStoreLocation != null && keyStoreLocation.startsWith("classpath:")) {
|
||||
final String fileSystemLocation = moveCertToFileSystem(keyStoreLocation, this.certificateStoreDirectory);
|
||||
// Overriding the value with absolute filesystem path.
|
||||
this.configuration.put("schema.registry.ssl.keystore.location", fileSystemLocation);
|
||||
}
|
||||
}
|
||||
|
||||
private String moveCertToFileSystem(String classpathLocation, String fileSystemLocation) throws IOException {
|
||||
File targetFile;
|
||||
final String tempDir = System.getProperty("java.io.tmpdir");
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2019 the original author or authors.
|
||||
* Copyright 2018-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -142,4 +142,22 @@ public class KafkaBinderConfigurationPropertiesTest {
|
||||
assertThat(configuration.get("ssl.keystore.location")).isEqualTo(
|
||||
Paths.get(Files.currentFolder().toString(), "target", "testclient.keystore").toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCertificateFilesAreMovedForSchemaRegistryConfiguration() {
|
||||
KafkaProperties kafkaProperties = new KafkaProperties();
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
final Map<String, String> configuration = kafkaBinderConfigurationProperties.getConfiguration();
|
||||
configuration.put("schema.registry.ssl.truststore.location", "classpath:testclient.truststore");
|
||||
configuration.put("schema.registry.ssl.keystore.location", "classpath:testclient.keystore");
|
||||
kafkaBinderConfigurationProperties.setCertificateStoreDirectory("target");
|
||||
|
||||
kafkaBinderConfigurationProperties.getKafkaConnectionString();
|
||||
|
||||
assertThat(configuration.get("schema.registry.ssl.truststore.location")).isEqualTo(
|
||||
Paths.get(Files.currentFolder().toString(), "target", "testclient.truststore").toString());
|
||||
assertThat(configuration.get("schema.registry.ssl.keystore.location")).isEqualTo(
|
||||
Paths.get(Files.currentFolder().toString(), "target", "testclient.keystore").toString());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<version>3.2.0</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.kstream.GlobalKTable;
|
||||
|
||||
import org.springframework.cloud.stream.binder.AbstractBinder;
|
||||
@@ -105,6 +106,12 @@ public class GlobalKTableBinder extends
|
||||
if (!streamsBuilderFactoryBean.isRunning()) {
|
||||
super.start();
|
||||
GlobalKTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
//If we cached the previous KafkaStreams object (from a binding stop on the actuator), remove it.
|
||||
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
|
||||
final String applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
|
||||
if (kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().containsKey(applicationId)) {
|
||||
kafkaStreamsBindingInformationCatalogue.removePreviousKafkaStreamsForApplicationId(applicationId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,6 +122,10 @@ public class GlobalKTableBinder extends
|
||||
super.stop();
|
||||
GlobalKTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
//Caching the stopped KafkaStreams for health indicator purposes on the underlying processor.
|
||||
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
|
||||
GlobalKTableBinder.this.kafkaStreamsBindingInformationCatalogue.addPreviousKafkaStreamsForApplicationId(
|
||||
(String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG), kafkaStreams);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2020 the original author or authors.
|
||||
* Copyright 2018-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.KeyQueryMetadata;
|
||||
import org.apache.kafka.streams.StoreQueryParameters;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.errors.InvalidStateStoreException;
|
||||
import org.apache.kafka.streams.state.HostInfo;
|
||||
import org.apache.kafka.streams.state.QueryableStoreType;
|
||||
@@ -52,6 +53,7 @@ import org.springframework.util.StringUtils;
|
||||
* @author Soby Chacko
|
||||
* @author Renwei Han
|
||||
* @author Serhii Siryi
|
||||
* @author Nico Pommerening
|
||||
* @since 2.1.0
|
||||
*/
|
||||
public class InteractiveQueryService {
|
||||
@@ -92,15 +94,16 @@ public class InteractiveQueryService {
|
||||
retryTemplate.setBackOffPolicy(backOffPolicy);
|
||||
retryTemplate.setRetryPolicy(retryPolicy);
|
||||
|
||||
KafkaStreams contextSpecificKafkaStreams = getThreadContextSpecificKafkaStreams();
|
||||
|
||||
return retryTemplate.execute(context -> {
|
||||
T store = null;
|
||||
|
||||
final Set<KafkaStreams> kafkaStreams = InteractiveQueryService.this.kafkaStreamsRegistry.getKafkaStreams();
|
||||
final Iterator<KafkaStreams> iterator = kafkaStreams.iterator();
|
||||
Throwable throwable = null;
|
||||
while (iterator.hasNext()) {
|
||||
if (contextSpecificKafkaStreams != null) {
|
||||
try {
|
||||
store = iterator.next().store(StoreQueryParameters.fromNameAndType(storeName, storeType));
|
||||
store = contextSpecificKafkaStreams.store(
|
||||
StoreQueryParameters.fromNameAndType(
|
||||
storeName, storeType));
|
||||
}
|
||||
catch (InvalidStateStoreException e) {
|
||||
// pass through..
|
||||
@@ -110,10 +113,56 @@ public class InteractiveQueryService {
|
||||
if (store != null) {
|
||||
return store;
|
||||
}
|
||||
throw new IllegalStateException("Error when retrieving state store: " + storeName, throwable);
|
||||
else if (contextSpecificKafkaStreams != null) {
|
||||
LOG.warn("Store " + storeName
|
||||
+ " could not be found in Streams context, falling back to all known Streams instances");
|
||||
}
|
||||
final Set<KafkaStreams> kafkaStreams = kafkaStreamsRegistry.getKafkaStreams();
|
||||
final Iterator<KafkaStreams> iterator = kafkaStreams.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
try {
|
||||
store = iterator.next()
|
||||
.store(StoreQueryParameters.fromNameAndType(
|
||||
storeName, storeType));
|
||||
}
|
||||
catch (InvalidStateStoreException e) {
|
||||
// pass through..
|
||||
throwable = e;
|
||||
}
|
||||
}
|
||||
if (store != null) {
|
||||
return store;
|
||||
}
|
||||
throw new IllegalStateException(
|
||||
"Error when retrieving state store: " + storeName,
|
||||
throwable);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the current {@link KafkaStreams} context if executing Thread is created by a Streams App (contains a matching application id in Thread's name).
|
||||
*
|
||||
* @return KafkaStreams instance associated with Thread
|
||||
*/
|
||||
private KafkaStreams getThreadContextSpecificKafkaStreams() {
|
||||
return this.kafkaStreamsRegistry.getKafkaStreams().stream()
|
||||
.filter(this::filterByThreadName).findAny().orElse(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the supplied {@link KafkaStreams} instance belongs to the calling Thread by matching the Thread's name with the Streams Application Id.
|
||||
*
|
||||
* @param streams {@link KafkaStreams} instance to filter
|
||||
* @return true if Streams Instance is associated with Thread
|
||||
*/
|
||||
private boolean filterByThreadName(KafkaStreams streams) {
|
||||
String applicationId = kafkaStreamsRegistry.streamBuilderFactoryBean(
|
||||
streams).getStreamsConfiguration()
|
||||
.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
|
||||
// TODO: is there some better way to find out if a Stream App created the Thread?
|
||||
return Thread.currentThread().getName().contains(applicationId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current {@link HostInfo} that the calling kafka streams application is
|
||||
* running on.
|
||||
|
||||
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Produced;
|
||||
import org.apache.kafka.streams.processor.StreamPartitioner;
|
||||
@@ -134,6 +135,12 @@ class KStreamBinder extends
|
||||
if (!streamsBuilderFactoryBean.isRunning()) {
|
||||
super.start();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
//If we cached the previous KafkaStreams object (from a binding stop on the actuator), remove it.
|
||||
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
|
||||
final String applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
|
||||
if (kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().containsKey(applicationId)) {
|
||||
kafkaStreamsBindingInformationCatalogue.removePreviousKafkaStreamsForApplicationId(applicationId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,6 +151,10 @@ class KStreamBinder extends
|
||||
super.stop();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
//Caching the stopped KafkaStreams for health indicator purposes on the underlying processor.
|
||||
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
|
||||
KStreamBinder.this.kafkaStreamsBindingInformationCatalogue.addPreviousKafkaStreamsForApplicationId(
|
||||
(String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG), kafkaStreams);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -199,6 +210,12 @@ class KStreamBinder extends
|
||||
if (!streamsBuilderFactoryBean.isRunning()) {
|
||||
super.start();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
//If we cached the previous KafkaStreams object (from a binding stop on the actuator), remove it.
|
||||
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
|
||||
final String applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
|
||||
if (kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().containsKey(applicationId)) {
|
||||
kafkaStreamsBindingInformationCatalogue.removePreviousKafkaStreamsForApplicationId(applicationId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,6 +226,10 @@ class KStreamBinder extends
|
||||
super.stop();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
//Caching the stopped KafkaStreams for health indicator purposes on the underlying processor
|
||||
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
|
||||
KStreamBinder.this.kafkaStreamsBindingInformationCatalogue.addPreviousKafkaStreamsForApplicationId(
|
||||
(String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG), kafkaStreams);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
|
||||
import org.springframework.cloud.stream.binder.AbstractBinder;
|
||||
@@ -106,6 +107,12 @@ class KTableBinder extends
|
||||
if (!streamsBuilderFactoryBean.isRunning()) {
|
||||
super.start();
|
||||
KTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
//If we cached the previous KafkaStreams object (from a binding stop on the actuator), remove it.
|
||||
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
|
||||
final String applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
|
||||
if (kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().containsKey(applicationId)) {
|
||||
kafkaStreamsBindingInformationCatalogue.removePreviousKafkaStreamsForApplicationId(applicationId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,6 +123,10 @@ class KTableBinder extends
|
||||
super.stop();
|
||||
KTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
//Caching the stopped KafkaStreams for health indicator purposes on the underlying processor.
|
||||
//See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
|
||||
KTableBinder.this.kafkaStreamsBindingInformationCatalogue.addPreviousKafkaStreamsForApplicationId(
|
||||
(String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG), kafkaStreams);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
import java.lang.reflect.Method;
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@@ -31,8 +32,8 @@ import org.apache.kafka.clients.admin.AdminClient;
|
||||
import org.apache.kafka.clients.admin.ListTopicsResult;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.processor.TaskMetadata;
|
||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||
import org.apache.kafka.streams.TaskMetadata;
|
||||
import org.apache.kafka.streams.ThreadMetadata;
|
||||
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
|
||||
@@ -118,7 +119,12 @@ public class KafkaStreamsBinderHealthIndicator extends AbstractHealthIndicator i
|
||||
}
|
||||
else {
|
||||
boolean up = true;
|
||||
for (KafkaStreams kStream : kafkaStreamsRegistry.getKafkaStreams()) {
|
||||
final Set<KafkaStreams> kafkaStreams = kafkaStreamsRegistry.getKafkaStreams();
|
||||
Set<KafkaStreams> allKafkaStreams = new HashSet<>(kafkaStreams);
|
||||
if (this.configurationProperties.isIncludeStoppedProcessorsForHealthCheck()) {
|
||||
allKafkaStreams.addAll(kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams().values());
|
||||
}
|
||||
for (KafkaStreams kStream : allKafkaStreams) {
|
||||
if (isKafkaStreams25) {
|
||||
up &= kStream.state().isRunningOrRebalancing();
|
||||
}
|
||||
@@ -156,7 +162,8 @@ public class KafkaStreamsBinderHealthIndicator extends AbstractHealthIndicator i
|
||||
}
|
||||
|
||||
if (isRunningResult) {
|
||||
for (ThreadMetadata metadata : kafkaStreams.localThreadsMetadata()) {
|
||||
final Set<ThreadMetadata> threadMetadata = kafkaStreams.metadataForLocalThreads();
|
||||
for (ThreadMetadata metadata : threadMetadata) {
|
||||
perAppdIdDetails.put("threadName", metadata.threadName());
|
||||
perAppdIdDetails.put("threadState", metadata.threadState());
|
||||
perAppdIdDetails.put("adminClientId", metadata.adminClientId());
|
||||
@@ -172,8 +179,19 @@ public class KafkaStreamsBinderHealthIndicator extends AbstractHealthIndicator i
|
||||
}
|
||||
else {
|
||||
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamBuilderFactoryBean(kafkaStreams);
|
||||
final String applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
|
||||
details.put(applicationId, String.format("The processor with application.id %s is down", applicationId));
|
||||
String applicationId = null;
|
||||
if (streamsBuilderFactoryBean != null) {
|
||||
applicationId = (String) streamsBuilderFactoryBean.getStreamsConfiguration().get(StreamsConfig.APPLICATION_ID_CONFIG);
|
||||
}
|
||||
else {
|
||||
final Map<String, KafkaStreams> stoppedKafkaStreamsPerBinding = kafkaStreamsBindingInformationCatalogue.getStoppedKafkaStreams();
|
||||
for (String appId : stoppedKafkaStreamsPerBinding.keySet()) {
|
||||
if (stoppedKafkaStreamsPerBinding.get(appId).equals(kafkaStreams)) {
|
||||
applicationId = appId;
|
||||
}
|
||||
}
|
||||
}
|
||||
details.put(applicationId, String.format("The processor with application.id %s is down. Current state: %s", applicationId, kafkaStreams.state()));
|
||||
}
|
||||
return details;
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
|
||||
@@ -62,6 +63,8 @@ public class KafkaStreamsBindingInformationCatalogue {
|
||||
|
||||
private final Map<Object, String> bindingNamesPerTarget = new HashMap<>();
|
||||
|
||||
private final Map<String, KafkaStreams> previousKafkaStreamsPerApplicationId = new HashMap<>();
|
||||
|
||||
private final Map<StreamsBuilderFactoryBean, List<ProducerFactory<byte[], byte[]>>> dlqProducerFactories = new HashMap<>();
|
||||
|
||||
/**
|
||||
@@ -213,4 +216,35 @@ public class KafkaStreamsBindingInformationCatalogue {
|
||||
}
|
||||
producerFactories.add(producerFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Caching the previous KafkaStreams for the applicaiton.id when binding is stopped through actuator.
|
||||
* See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
|
||||
*
|
||||
* @param applicationId application.id
|
||||
* @param kafkaStreams {@link KafkaStreams} object
|
||||
*/
|
||||
public void addPreviousKafkaStreamsForApplicationId(String applicationId, KafkaStreams kafkaStreams) {
|
||||
this.previousKafkaStreamsPerApplicationId.put(applicationId, kafkaStreams);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the previously cached KafkaStreams object.
|
||||
* See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
|
||||
*
|
||||
* @param applicationId application.id
|
||||
*/
|
||||
public void removePreviousKafkaStreamsForApplicationId(String applicationId) {
|
||||
this.previousKafkaStreamsPerApplicationId.remove(applicationId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all stopped KafkaStreams objects through actuator binding stop.
|
||||
* See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
|
||||
*
|
||||
* @return stopped KafkaStreams objects map
|
||||
*/
|
||||
public Map<String, KafkaStreams> getStoppedKafkaStreams() {
|
||||
return this.previousKafkaStreamsPerApplicationId;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,6 +74,7 @@ public class KafkaStreamsBinderConfigurationProperties
|
||||
*/
|
||||
private DeserializationExceptionHandler deserializationExceptionHandler;
|
||||
|
||||
private boolean includeStoppedProcessorsForHealthCheck;
|
||||
|
||||
public Map<String, Functions> getFunctions() {
|
||||
return functions;
|
||||
@@ -127,6 +128,14 @@ public class KafkaStreamsBinderConfigurationProperties
|
||||
this.deserializationExceptionHandler = deserializationExceptionHandler;
|
||||
}
|
||||
|
||||
public boolean isIncludeStoppedProcessorsForHealthCheck() {
|
||||
return includeStoppedProcessorsForHealthCheck;
|
||||
}
|
||||
|
||||
public void setIncludeStoppedProcessorsForHealthCheck(boolean includeStoppedProcessorsForHealthCheck) {
|
||||
this.includeStoppedProcessorsForHealthCheck = includeStoppedProcessorsForHealthCheck;
|
||||
}
|
||||
|
||||
public static class StateStoreRetry {
|
||||
|
||||
private int maxAttempts = 1;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2017-2020 the original author or authors.
|
||||
* Copyright 2017-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
@@ -28,6 +29,7 @@ import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.KeyQueryMetadata;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StoreQueryParameters;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.kstream.Grouped;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
@@ -69,6 +71,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.times;
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @author Gary Russell
|
||||
* @author Nico Pommerening
|
||||
*/
|
||||
public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
|
||||
@@ -106,6 +109,9 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
KafkaStreamsRegistry kafkaStreamsRegistry = new KafkaStreamsRegistry();
|
||||
kafkaStreamsRegistry.registerKafkaStreams(mock);
|
||||
Mockito.when(mock.isRunning()).thenReturn(true);
|
||||
Properties mockProperties = new Properties();
|
||||
mockProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "fooApp");
|
||||
Mockito.when(mock.getStreamsConfiguration()).thenReturn(mockProperties);
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties =
|
||||
new KafkaStreamsBinderConfigurationProperties(new KafkaProperties());
|
||||
binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3);
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.0-SNAPSHOT</version>
|
||||
<version>3.2.0</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -76,6 +76,7 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerPro
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
|
||||
import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor;
|
||||
import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor;
|
||||
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
|
||||
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
|
||||
@@ -422,6 +423,10 @@ public class KafkaMessageChannelBinder extends
|
||||
((PartitioningInterceptor) interceptor)
|
||||
.setPartitionCount(partitions.size());
|
||||
}
|
||||
else if (interceptor instanceof DefaultPartitioningInterceptor) {
|
||||
((DefaultPartitioningInterceptor) interceptor)
|
||||
.setPartitionCount(partitions.size());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user