Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
593cf44477 | ||
|
|
265b881d35 | ||
|
|
ac2ecf1a54 | ||
|
|
0dd86b60c1 | ||
|
|
2fc335c597 | ||
|
|
bc62f76756 | ||
|
|
e537020f37 | ||
|
|
20d823eeec | ||
|
|
0034f085a2 | ||
|
|
ef8d358f1d | ||
|
|
6e2bc7bc87 | ||
|
|
389b45d6bd | ||
|
|
e6602f235e | ||
|
|
f6b2021310 | ||
|
|
6a838acca4 | ||
|
|
cde5b7b80b | ||
|
|
2495e6e44e |
60
README.adoc
60
README.adoc
@@ -239,7 +239,7 @@ Note that this property is only applicable for pollable consumers.
|
||||
Default: not set.
|
||||
resetOffsets::
|
||||
Whether to reset offsets on the consumer to the value provided by startOffset.
|
||||
Must be false if a `KafkaRebalanceListener` is provided; see <<rebalance-listener>>.
|
||||
Must be false if a `KafkaBindingRebalanceListener` is provided; see <<rebalance-listener>>.
|
||||
See <<reset-offsets>> for more information about this property.
|
||||
+
|
||||
Default: `false`.
|
||||
@@ -337,6 +337,11 @@ Usually needed if you want to synchronize another transaction with the Kafka tra
|
||||
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
|
||||
+
|
||||
Default: none.
|
||||
txCommitRecovered::
|
||||
When using a transactional binder, the offset of a recovered record (e.g. when retries are exhausted and the record is sent to a dead letter topic) will be committed via a new transaction, by default.
|
||||
Setting this property to `false` suppresses committing the offset of recovered record.
|
||||
+
|
||||
Default: true.
|
||||
|
||||
[[reset-offsets]]
|
||||
==== Resetting Offsets
|
||||
@@ -363,7 +368,7 @@ Set `resetOffsets` to `true` and `startOffset` to `latest`; the binding will per
|
||||
|
||||
IMPORTANT: If a rebalance occurs after the initial assignment, the seeks will only be performed on any newly assigned partitions that were not assigned during the initial assignment.
|
||||
|
||||
For more control over topic offsets, see <<rebalance-listener>>; when a listener is provided, `resetOffsets: true` is ignored.
|
||||
For more control over topic offsets, see <<rebalance-listener>>; when a listener is provided, `resetOffsets` should not be set to `true`, otherwise, that will cause an error.
|
||||
|
||||
==== Consuming Batches
|
||||
|
||||
@@ -506,11 +511,11 @@ Default: `false`
|
||||
|
||||
In this section, we show the use of the preceding properties for specific scenarios.
|
||||
|
||||
===== Example: Setting `autoCommitOffset` to `false` and Relying on Manual Acking
|
||||
===== Example: Setting `ackMode` to `MANUAL` and Relying on Manual Acknowledgement
|
||||
|
||||
This example illustrates how one may manually acknowledge offsets in a consumer application.
|
||||
|
||||
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset` be set to `false`.
|
||||
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.ackMode` be set to `MANUAL`.
|
||||
Use the corresponding input channel name for your example.
|
||||
|
||||
[source]
|
||||
@@ -622,6 +627,47 @@ Usually, applications may use principals that do not have administrative rights
|
||||
Consequently, relying on Spring Cloud Stream to create/modify topics may fail.
|
||||
In secure environments, we strongly recommend creating topics and managing ACLs administratively by using Kafka tooling.
|
||||
|
||||
====== Multi-binder configuration and JAAS
|
||||
|
||||
When connecting to multiple clusters in which each one requires separate JAAS configuration, then set the JAAS configuration using the property `sasl.jaas.config`.
|
||||
When this property is present in the applicaiton, it takes precedence over the other strategies mentioned above.
|
||||
See this https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients[KIP-85] for more details.
|
||||
|
||||
For example, if you have two clusters in your application with separate JAAS configuration, then the following is a template that you can use:
|
||||
|
||||
```
|
||||
spring.cloud.stream:
|
||||
binders:
|
||||
kafka1:
|
||||
type: kafka
|
||||
environment:
|
||||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
kafka:
|
||||
binder:
|
||||
brokers: localhost:9092
|
||||
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
|
||||
kafka2:
|
||||
type: kafka
|
||||
environment:
|
||||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
kafka:
|
||||
binder:
|
||||
brokers: localhost:9093
|
||||
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
|
||||
kafka.binder:
|
||||
configuration:
|
||||
security.protocol: SASL_PLAINTEXT
|
||||
sasl.mechanism: PLAIN
|
||||
```
|
||||
|
||||
Note that both the Kafka clusters, and the `sasl.jaas.config` values for each of them are different in the above configuration.
|
||||
|
||||
See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples/kafka-multi-binder-jaas[sample application] for more details on how to setup and run such an application.
|
||||
|
||||
[[pause-resume]]
|
||||
===== Example: Pausing and Resuming the Consumer
|
||||
|
||||
@@ -774,10 +820,10 @@ public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
|
||||
====
|
||||
|
||||
[[rebalance-listener]]
|
||||
=== Using a KafkaRebalanceListener
|
||||
=== Using a KafkaBindingRebalanceListener
|
||||
|
||||
Applications may wish to seek topics/partitions to arbitrary offsets when the partitions are initially assigned, or perform other operations on the consumer.
|
||||
Starting with version 2.1, if you provide a single `KafkaRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.
|
||||
Starting with version 2.1, if you provide a single `KafkaBindingRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.
|
||||
|
||||
====
|
||||
[source, java]
|
||||
@@ -830,7 +876,7 @@ You cannot set the `resetOffsets` consumer property to `true` when you provide a
|
||||
If you want advanced customization of consumer and producer configuration that is used for creating `ConsumerFactory` and `ProducerFactory` in Kafka,
|
||||
you can implement the following customizers.
|
||||
|
||||
* ConsusumerConfigCustomizer
|
||||
* ConsumerConfigCustomizer
|
||||
* ProducerConfigCustomizer
|
||||
|
||||
Both of these interfaces provide a way to configure the config map used for consumer and producer properties.
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.1.4-SNAPSHOT</version>
|
||||
<version>3.1.4</version>
|
||||
</parent>
|
||||
<packaging>jar</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
|spring.cloud.stream.dynamic-destinations | `[]` | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound.
|
||||
|spring.cloud.stream.function.batch-mode | `false` |
|
||||
|spring.cloud.stream.function.bindings | |
|
||||
|spring.cloud.stream.function.definition | | Definition of functions to bind. If several functions need to be composed into one, use pipes (e.g., 'fooFunc\|barFunc')
|
||||
|spring.cloud.stream.instance-count | `1` | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index | `0` | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index-list | | A list of instance id's from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is the name of the binding. This setting will override the one set in 'spring.cloud.stream.instance-index'
|
||||
|
||||
@@ -252,7 +252,28 @@ The input from the three partial functions which are `KStream`, `GlobalKTable`,
|
||||
Input bindings are named as `enrichOrder-in-0`, `enrichOrder-in-1` and `enrichOrder-in-2` respectively. Output binding is named as `enrichOrder-out-0`.
|
||||
|
||||
With curried functions, you can virtually have any number of inputs. However, keep in mind that, anything more than a smaller number of inputs and partially applied functions for them as above in Java might lead to unreadable code.
|
||||
Therefore if your Kafka Streams application requires more than a reasonably smaller number of input bindings and you want to use this functional model, then you may want to rethink your design and decompose the application appropriately.
|
||||
Therefore if your Kafka Streams application requires more than a reasonably smaller number of input bindings, and you want to use this functional model, then you may want to rethink your design and decompose the application appropriately.
|
||||
|
||||
===== Output Bindings
|
||||
|
||||
Kafka Streams binder allows types of either `KStream` or `KTable` as output bindings.
|
||||
Behind the scenes, the binder uses the `to` method on `KStream` to send the resultant records to the output topic.
|
||||
If the application provides a `KTable` as output in the function, the binder still uses this technique by delegating to the `to` method of `KStream`.
|
||||
|
||||
For example both functions below will work:
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Function<KStream<String, String>, KTable<String, String>> foo() {
|
||||
return KStream::toTable;
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<KTable<String, String>, KStream<String, String>> bar() {
|
||||
return KTable::toStream;
|
||||
}
|
||||
```
|
||||
|
||||
===== Multiple Output Bindings
|
||||
|
||||
@@ -383,8 +404,7 @@ The default output binding for this example becomes `curriedFoobar-out-0`.
|
||||
|
||||
====== Special note on using `KTable` as output in function composition
|
||||
|
||||
When using function composition, for intermediate functions, you can use `KTable` as output.
|
||||
For instance, lets say you have the following two functions.
|
||||
Lets say you have the following two functions.
|
||||
|
||||
```
|
||||
@Bean
|
||||
@@ -399,10 +419,7 @@ public Function<KTable<String, String>, KStream<String, String>> bar() {
|
||||
}
|
||||
```
|
||||
|
||||
You can compose them as `foo|bar` although foo's output is `KTable`.
|
||||
In normal case, when you use `foo` as standalone, this will not work, as the binder does not support `KTable` as the final output.
|
||||
Note that in the example above, bar's output is still a `KStream`.
|
||||
We are only able to use `foo` which has a `KTable` output, since we are composing with another function that has `KStream` as its output.
|
||||
You can compose them as `foo|bar`, but keep in mind that the second function (`bar` in this case) must have a `KTable` as input since the first function (`foo`) has `KTable` as output.
|
||||
|
||||
==== Imperative programming model.
|
||||
|
||||
@@ -974,7 +991,7 @@ One important thing to keep in mind when providing an implementation for `DlqDes
|
||||
This is because there is no way for the binder to infer the names of all the DLQ topics the implementation might send to.
|
||||
Therefore, if you provide DLQ names using this strategy, it is the application's responsibility to ensure that those topics are created beforehand.
|
||||
|
||||
If `DlqDestinationResolver` is present in the application as a bean, that takes higher prcedence.
|
||||
If `DlqDestinationResolver` is present in the application as a bean, that takes higher precedence.
|
||||
If you do not want to follow this approach and rather provide a static DLQ name using configuration, you can set the following property.
|
||||
|
||||
[source]
|
||||
@@ -1002,10 +1019,10 @@ public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String,
|
||||
}
|
||||
```
|
||||
|
||||
and you only want to enable DLQ on the first input binding and logAndSkip on the second binding, then you can do so on the consumer as below.
|
||||
and you only want to enable DLQ on the first input binding and skipAndContinue on the second binding, then you can do so on the consumer as below.
|
||||
|
||||
`spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq`
|
||||
`spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: logAndSkip`
|
||||
`spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue`
|
||||
|
||||
Setting deserialization exception handlers this way has a higher precedence than setting at the binder level.
|
||||
|
||||
@@ -1853,6 +1870,69 @@ When there are multiple bindings present on a single function, invoking these op
|
||||
This is because all the bindings on a single function are backed by the same `StreamsBuilderFactoryBean`.
|
||||
Therefore, for the function above, either `function-in-0` or `function-out-0` will work.
|
||||
|
||||
=== Tracing using Spring Cloud Sleuth
|
||||
|
||||
When Spring Cloud Sleuth is on the classpath of a Spring Cloud Stream Kafka Streams binder based application, both its consumer and producer are automatically instrumented with tracing information.
|
||||
However, in order to trace any application specific operations, those need to be explicitly instrumented by the user code.
|
||||
This can be done by injecting the `KafkaStreamsTracing` bean from Spring Cloud Sleuth in the application and then invoke various Kafka Streams operations through this injected bean.
|
||||
Here are some examples of using it.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
|
||||
return (userClicksStream, userRegionsTable) -> (userClicksStream
|
||||
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
|
||||
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
|
||||
"UNKNOWN" : region, clicks),
|
||||
Joined.with(Serdes.String(), Serdes.Long(), null))
|
||||
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
|
||||
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
|
||||
return new KeyValue<>(value.getRegion(),
|
||||
value.getClicks());
|
||||
}))
|
||||
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
|
||||
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
|
||||
.toStream());
|
||||
}
|
||||
```
|
||||
|
||||
In the example above, there are two places where it adds explicit tracing instrumentation.
|
||||
First, we are logging the key/value information from the incoming `KStream`.
|
||||
When this information is logged, the associated span and trace IDs get logged as well so that a monitoring system can track them and correlate with the same span id.
|
||||
Second, when we call a `map` operation, instead of calling it directly on the `KStream` class, we wrap it inside a `transform` operation and then call `map` from `KafkaStreamsTracing`.
|
||||
In this case also, the logged message will contain the span ID and trace ID.
|
||||
|
||||
Here is another example, where we use the low-level transformer API for accessing the various Kafka Streams headers.
|
||||
When spring-cloud-sleuth is on the classpath, all the tracing headers can also be accessed like this.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
|
||||
return input -> input.transform(kafkaStreamsTracing.transformer(
|
||||
"transformer-1",
|
||||
() -> new Transformer<String, String, KeyValue<String, String>>() {
|
||||
ProcessorContext context;
|
||||
|
||||
@Override
|
||||
public void init(ProcessorContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyValue<String, String> transform(String key, String value) {
|
||||
LOG.info("Headers: " + this.context.headers());
|
||||
LOG.info("K/V:" + key + "/" + value);
|
||||
// More transformations, business logic execution, etc. go here.
|
||||
return KeyValue.pair(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
}));
|
||||
}
|
||||
```
|
||||
|
||||
=== Configuration Options
|
||||
|
||||
This section contains the configuration options used by the Kafka Streams binder.
|
||||
@@ -1906,7 +1986,7 @@ deserializationExceptionHandler::
|
||||
Deserialization error handler type.
|
||||
This handler is applied at the binder level and thus applied against all input binding in the application.
|
||||
There is a way to control it in a more fine-grained way at the consumer binding level.
|
||||
Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq`
|
||||
Possible values are - `logAndContinue`, `logAndFail`, `skipAndContinue` or `sendToDlq`
|
||||
+
|
||||
Default: `logAndFail`
|
||||
|
||||
@@ -2013,7 +2093,7 @@ Unlike the message channel based binder, Kafka Streams binder does not seek to b
|
||||
deserializationExceptionHandler::
|
||||
Deserialization error handler type.
|
||||
This handler is applied per consumer binding as opposed to the binder level property described before.
|
||||
Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq`
|
||||
Possible values are - `logAndContinue`, `logAndFail`, `skipAndContinue` or `sendToDlq`
|
||||
+
|
||||
Default: `logAndFail`
|
||||
|
||||
|
||||
@@ -218,7 +218,7 @@ Note that this property is only applicable for pollable consumers.
|
||||
Default: not set.
|
||||
resetOffsets::
|
||||
Whether to reset offsets on the consumer to the value provided by startOffset.
|
||||
Must be false if a `KafkaRebalanceListener` is provided; see <<rebalance-listener>>.
|
||||
Must be false if a `KafkaBindingRebalanceListener` is provided; see <<rebalance-listener>>.
|
||||
See <<reset-offsets>> for more information about this property.
|
||||
+
|
||||
Default: `false`.
|
||||
@@ -490,11 +490,11 @@ Default: `false`
|
||||
|
||||
In this section, we show the use of the preceding properties for specific scenarios.
|
||||
|
||||
===== Example: Setting `autoCommitOffset` to `false` and Relying on Manual Acking
|
||||
===== Example: Setting `ackMode` to `MANUAL` and Relying on Manual Acknowledgement
|
||||
|
||||
This example illustrates how one may manually acknowledge offsets in a consumer application.
|
||||
|
||||
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset` be set to `false`.
|
||||
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.ackMode` be set to `MANUAL`.
|
||||
Use the corresponding input channel name for your example.
|
||||
|
||||
[source]
|
||||
@@ -799,10 +799,10 @@ public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
|
||||
====
|
||||
|
||||
[[rebalance-listener]]
|
||||
=== Using a KafkaRebalanceListener
|
||||
=== Using a KafkaBindingRebalanceListener
|
||||
|
||||
Applications may wish to seek topics/partitions to arbitrary offsets when the partitions are initially assigned, or perform other operations on the consumer.
|
||||
Starting with version 2.1, if you provide a single `KafkaRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.
|
||||
Starting with version 2.1, if you provide a single `KafkaBindingRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.
|
||||
|
||||
====
|
||||
[source, java]
|
||||
@@ -855,7 +855,7 @@ You cannot set the `resetOffsets` consumer property to `true` when you provide a
|
||||
If you want advanced customization of consumer and producer configuration that is used for creating `ConsumerFactory` and `ProducerFactory` in Kafka,
|
||||
you can implement the following customizers.
|
||||
|
||||
* ConsusumerConfigCustomizer
|
||||
* ConsumerConfigCustomizer
|
||||
* ProducerConfigCustomizer
|
||||
|
||||
Both of these interfaces provide a way to configure the config map used for consumer and producer properties.
|
||||
|
||||
12
pom.xml
12
pom.xml
@@ -2,21 +2,21 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.1.4-SNAPSHOT</version>
|
||||
<version>3.1.4</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>3.0.3</version>
|
||||
<version>3.0.4</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-kafka.version>2.6.8</spring-kafka.version>
|
||||
<spring-integration-kafka.version>5.4.7</spring-integration-kafka.version>
|
||||
<spring-kafka.version>2.6.10</spring-kafka.version>
|
||||
<spring-integration-kafka.version>5.4.10</spring-integration-kafka.version>
|
||||
<kafka.version>2.6.2</kafka.version>
|
||||
<spring-cloud-schema-registry.version>1.1.4-SNAPSHOT</spring-cloud-schema-registry.version>
|
||||
<spring-cloud-stream.version>3.1.4-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-schema-registry.version>1.1.4</spring-cloud-schema-registry.version>
|
||||
<spring-cloud-stream.version>3.1.4</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.1.4-SNAPSHOT</version>
|
||||
<version>3.1.4</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.1.4-SNAPSHOT</version>
|
||||
<version>3.1.4</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.1.4-SNAPSHOT</version>
|
||||
<version>3.1.4</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.kstream.GlobalKTable;
|
||||
|
||||
import org.springframework.cloud.stream.binder.AbstractBinder;
|
||||
@@ -58,16 +59,18 @@ public class GlobalKTableBinder extends
|
||||
|
||||
// @checkstyle:off
|
||||
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = new KafkaStreamsExtendedBindingProperties();
|
||||
private final KafkaStreamsRegistry kafkaStreamsRegistry;
|
||||
|
||||
// @checkstyle:on
|
||||
|
||||
public GlobalKTableBinder(
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner,
|
||||
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
|
||||
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
this.binderConfigurationProperties = binderConfigurationProperties;
|
||||
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
|
||||
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
|
||||
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -97,10 +100,22 @@ public class GlobalKTableBinder extends
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
if (!streamsBuilderFactoryBean.isRunning()) {
|
||||
super.start();
|
||||
GlobalKTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
super.stop();
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
if (streamsBuilderFactoryBean.isRunning()) {
|
||||
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
super.stop();
|
||||
GlobalKTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -59,10 +59,11 @@ public class GlobalKTableBinderConfiguration {
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner,
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
|
||||
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
|
||||
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties) {
|
||||
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties,
|
||||
KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
|
||||
GlobalKTableBinder globalKTableBinder = new GlobalKTableBinder(binderConfigurationProperties,
|
||||
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue);
|
||||
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue, kafkaStreamsRegistry);
|
||||
globalKTableBinder.setKafkaStreamsExtendedBindingProperties(
|
||||
kafkaStreamsExtendedBindingProperties);
|
||||
return globalKTableBinder;
|
||||
|
||||
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Produced;
|
||||
import org.apache.kafka.streams.processor.StreamPartitioner;
|
||||
@@ -78,16 +79,19 @@ class KStreamBinder extends
|
||||
|
||||
private final KeyValueSerdeResolver keyValueSerdeResolver;
|
||||
|
||||
private final KafkaStreamsRegistry kafkaStreamsRegistry;
|
||||
|
||||
KStreamBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner,
|
||||
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
|
||||
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
|
||||
KeyValueSerdeResolver keyValueSerdeResolver) {
|
||||
KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
this.binderConfigurationProperties = binderConfigurationProperties;
|
||||
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
|
||||
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
|
||||
this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
|
||||
this.keyValueSerdeResolver = keyValueSerdeResolver;
|
||||
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -125,10 +129,22 @@ class KStreamBinder extends
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
if (!streamsBuilderFactoryBean.isRunning()) {
|
||||
super.start();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
super.stop();
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
if (streamsBuilderFactoryBean.isRunning()) {
|
||||
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
super.stop();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -178,10 +194,22 @@ class KStreamBinder extends
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
if (!streamsBuilderFactoryBean.isRunning()) {
|
||||
super.start();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
super.stop();
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
if (streamsBuilderFactoryBean.isRunning()) {
|
||||
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
super.stop();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -59,10 +59,10 @@ public class KStreamBinderConfiguration {
|
||||
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate,
|
||||
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
|
||||
KeyValueSerdeResolver keyValueSerdeResolver,
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
KStreamBinder kStreamBinder = new KStreamBinder(binderConfigurationProperties,
|
||||
kafkaTopicProvisioner, KafkaStreamsMessageConversionDelegate,
|
||||
KafkaStreamsBindingInformationCatalogue, keyValueSerdeResolver);
|
||||
KafkaStreamsBindingInformationCatalogue, keyValueSerdeResolver, kafkaStreamsRegistry);
|
||||
kStreamBinder.setKafkaStreamsExtendedBindingProperties(
|
||||
kafkaStreamsExtendedBindingProperties);
|
||||
return kStreamBinder;
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
|
||||
import org.springframework.cloud.stream.binder.AbstractBinder;
|
||||
@@ -44,13 +45,10 @@ import org.springframework.util.StringUtils;
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
class KTableBinder extends
|
||||
// @checkstyle:off
|
||||
AbstractBinder<KTable<Object, Object>, ExtendedConsumerProperties<KafkaStreamsConsumerProperties>, ExtendedProducerProperties<KafkaStreamsProducerProperties>>
|
||||
implements
|
||||
ExtendedPropertiesBinder<KTable<Object, Object>, KafkaStreamsConsumerProperties, KafkaStreamsProducerProperties> {
|
||||
|
||||
// @checkstyle:on
|
||||
|
||||
private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
|
||||
|
||||
private final KafkaTopicProvisioner kafkaTopicProvisioner;
|
||||
@@ -62,12 +60,15 @@ class KTableBinder extends
|
||||
|
||||
// @checkstyle:on
|
||||
|
||||
private final KafkaStreamsRegistry kafkaStreamsRegistry;
|
||||
|
||||
KTableBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner,
|
||||
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue) {
|
||||
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue, KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
this.binderConfigurationProperties = binderConfigurationProperties;
|
||||
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
|
||||
this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
|
||||
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -100,10 +101,22 @@ class KTableBinder extends
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
if (!streamsBuilderFactoryBean.isRunning()) {
|
||||
super.start();
|
||||
KTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
super.stop();
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
if (streamsBuilderFactoryBean.isRunning()) {
|
||||
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
super.stop();
|
||||
KTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -111,9 +124,7 @@ class KTableBinder extends
|
||||
@Override
|
||||
protected Binding<KTable<Object, Object>> doBindProducer(String name,
|
||||
KTable<Object, Object> outboundBindTarget,
|
||||
// @checkstyle:off
|
||||
ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
|
||||
// @checkstyle:on
|
||||
throw new UnsupportedOperationException(
|
||||
"No producer level binding is allowed for KTable");
|
||||
}
|
||||
|
||||
@@ -59,9 +59,10 @@ public class KTableBinderConfiguration {
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner,
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
|
||||
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
|
||||
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties) {
|
||||
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties,
|
||||
KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
KTableBinder kTableBinder = new KTableBinder(binderConfigurationProperties,
|
||||
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue);
|
||||
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue, kafkaStreamsRegistry);
|
||||
kTableBinder.setKafkaStreamsExtendedBindingProperties(kafkaStreamsExtendedBindingProperties);
|
||||
return kTableBinder;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2017-2020 the original author or authors.
|
||||
* Copyright 2017-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -98,6 +98,9 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
|
||||
private static final String GLOBALKTABLE_BINDER_TYPE = "globalktable";
|
||||
|
||||
private static final String CONSUMER_PROPERTIES_PREFIX = "consumer.";
|
||||
private static final String PRODUCER_PROPERTIES_PREFIX = "producer.";
|
||||
|
||||
@Bean
|
||||
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder")
|
||||
public KafkaStreamsBinderConfigurationProperties binderConfigurationProperties(
|
||||
@@ -266,14 +269,15 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
if (!ObjectUtils.isEmpty(configProperties.getConfiguration())) {
|
||||
properties.putAll(configProperties.getConfiguration());
|
||||
}
|
||||
Map<String, Object> mergedConsumerConfig = configProperties.mergedConsumerConfiguration();
|
||||
if (!ObjectUtils.isEmpty(mergedConsumerConfig)) {
|
||||
properties.putAll(mergedConsumerConfig);
|
||||
}
|
||||
Map<String, Object> mergedProducerConfig = configProperties.mergedProducerConfiguration();
|
||||
if (!ObjectUtils.isEmpty(mergedProducerConfig)) {
|
||||
properties.putAll(mergedProducerConfig);
|
||||
}
|
||||
|
||||
Map<String, Object> mergedConsumerConfig = new HashMap<>(configProperties.mergedConsumerConfiguration());
|
||||
//Adding consumer. prefix if they are missing (in order to differentiate them from other property categories such as stream, producer etc.)
|
||||
addPrefix(properties, mergedConsumerConfig, CONSUMER_PROPERTIES_PREFIX);
|
||||
|
||||
Map<String, Object> mergedProducerConfig = new HashMap<>(configProperties.mergedProducerConfiguration());
|
||||
//Adding producer. prefix if they are missing (in order to differentiate them from other property categories such as stream, consumer etc.)
|
||||
addPrefix(properties, mergedProducerConfig, PRODUCER_PROPERTIES_PREFIX);
|
||||
|
||||
if (!properties.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG)) {
|
||||
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,
|
||||
(int) configProperties.getReplicationFactor());
|
||||
@@ -282,6 +286,16 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
Collectors.toMap((e) -> String.valueOf(e.getKey()), Map.Entry::getValue));
|
||||
}
|
||||
|
||||
private void addPrefix(Properties properties, Map<String, Object> mergedConsProdConfig, String prefix) {
|
||||
Map<String, Object> mergedConfigs = new HashMap<>();
|
||||
for (String key : mergedConsProdConfig.keySet()) {
|
||||
mergedConfigs.put(key.startsWith(prefix) ? key : prefix + key, mergedConsProdConfig.get(key));
|
||||
}
|
||||
if (!ObjectUtils.isEmpty(mergedConfigs)) {
|
||||
properties.putAll(mergedConfigs);
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KStreamStreamListenerResultAdapter kstreamStreamListenerResultAdapter() {
|
||||
return new KStreamStreamListenerResultAdapter();
|
||||
|
||||
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
@@ -298,7 +299,13 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
outboundResolvableType, (Object[]) result, streamsBuilderFactoryBean);
|
||||
}
|
||||
else {
|
||||
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType, (KStream) result, outboundDefinitionIterator);
|
||||
if (KTable.class.isAssignableFrom(result.getClass())) {
|
||||
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
|
||||
outboundResolvableType : resolvableType.getGeneric(1), ((KTable) result).toStream(), outboundDefinitionIterator);
|
||||
}
|
||||
else {
|
||||
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType, (KStream) result, outboundDefinitionIterator);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -337,8 +344,14 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
|
||||
outboundResolvableType, (Object[]) result, streamsBuilderFactoryBean);
|
||||
}
|
||||
else {
|
||||
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
|
||||
outboundResolvableType : resolvableType.getGeneric(1), (KStream) result, outboundDefinitionIterator);
|
||||
if (KTable.class.isAssignableFrom(result.getClass())) {
|
||||
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
|
||||
outboundResolvableType : resolvableType.getGeneric(1), ((KTable) result).toStream(), outboundDefinitionIterator);
|
||||
}
|
||||
else {
|
||||
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
|
||||
outboundResolvableType : resolvableType.getGeneric(1), (KStream) result, outboundDefinitionIterator);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,6 +62,11 @@ public class KafkaStreamsRegistry {
|
||||
this.streamsBuilderFactoryBeanMap.put(kafkaStreams, streamsBuilderFactoryBean);
|
||||
}
|
||||
|
||||
void unregisterKafkaStreams(KafkaStreams kafkaStreams) {
|
||||
this.kafkaStreams.remove(kafkaStreams);
|
||||
this.streamsBuilderFactoryBeanMap.remove(kafkaStreams);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param kafkaStreams {@link KafkaStreams} object
|
||||
|
||||
@@ -145,7 +145,8 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
|
||||
}
|
||||
|
||||
if (outboundArgument != null && outboundArgument.getRawClass() != null && (!outboundArgument.isArray() &&
|
||||
outboundArgument.getRawClass().isAssignableFrom(KStream.class))) {
|
||||
(outboundArgument.getRawClass().isAssignableFrom(KStream.class) ||
|
||||
outboundArgument.getRawClass().isAssignableFrom(KTable.class)))) { //Allowing both KStream and KTable on the outbound.
|
||||
// if the type is array, we need to do a late binding as we don't know the number of
|
||||
// output bindings at this point in the flow.
|
||||
|
||||
@@ -157,12 +158,15 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
|
||||
if (outputBindingsIter.hasNext()) {
|
||||
outputBinding = outputBindingsIter.next();
|
||||
}
|
||||
|
||||
}
|
||||
else {
|
||||
outputBinding = String.format("%s-%s-0", this.functionName, FunctionConstants.DEFAULT_OUTPUT_SUFFIX);
|
||||
}
|
||||
Assert.isTrue(outputBinding != null, "output binding is not inferred.");
|
||||
// We will only allow KStream targets on the outbound. If the user provides a KTable,
|
||||
// we still use the KStreamBinder to send it through the outbound.
|
||||
// In that case before sending, we do a cast from KTable to KStream.
|
||||
// See KafkaStreamsFunctionsProcessor#setupFunctionInvokerForKafkaStreams for details.
|
||||
KafkaStreamsBindableProxyFactory.this.outputHolders.put(outputBinding,
|
||||
new BoundTargetHolder(getBindingTargetFactory(KStream.class)
|
||||
.createOutput(outputBinding), true));
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -111,6 +111,7 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
"--spring.cloud.stream.kafka.streams.binder.application-id=testKstreamWordCountFunction",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.consumerProperties.request.timeout.ms=29000", //for testing ...binder.consumerProperties
|
||||
"--spring.cloud.stream.kafka.streams.binder.consumerProperties.consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
|
||||
"--spring.cloud.stream.kafka.streams.binder.producerProperties.max.block.ms=90000", //for testing ...binder.producerProperties
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
|
||||
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
@@ -143,8 +144,9 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
|
||||
//verify that ...binder.consumerProperties and ...binder.producerProperties work.
|
||||
Map<String, Object> streamConfigGlobalProperties = (Map<String, Object>) context.getBean("streamConfigGlobalProperties");
|
||||
assertThat(streamConfigGlobalProperties.get("request.timeout.ms")).isEqualTo("29000");
|
||||
assertThat(streamConfigGlobalProperties.get("max.block.ms")).isEqualTo("90000");
|
||||
assertThat(streamConfigGlobalProperties.get("consumer.request.timeout.ms")).isEqualTo("29000");
|
||||
assertThat(streamConfigGlobalProperties.get("consumer.value.deserializer")).isEqualTo("org.apache.kafka.common.serialization.StringDeserializer");
|
||||
assertThat(streamConfigGlobalProperties.get("producer.max.block.ms")).isEqualTo("90000");
|
||||
|
||||
InputBindingLifecycle inputBindingLifecycle = context.getBean(InputBindingLifecycle.class);
|
||||
final Collection<Binding<Object>> inputBindings = (Collection<Binding<Object>>) new DirectFieldAccessor(inputBindingLifecycle)
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.1.4-SNAPSHOT</version>
|
||||
<version>3.1.4</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016-2020 the original author or authors.
|
||||
* Copyright 2016-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -208,10 +208,11 @@ public class KafkaBinderMetrics
|
||||
Map<TopicPartition, Long> endOffsets = metadataConsumer
|
||||
.endOffsets(topicPartitions);
|
||||
|
||||
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = metadataConsumer.committed(endOffsets.keySet());
|
||||
|
||||
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets
|
||||
.entrySet()) {
|
||||
OffsetAndMetadata current = metadataConsumer
|
||||
.committed(endOffset.getKey());
|
||||
OffsetAndMetadata current = committedOffsets.get(endOffset.getKey());
|
||||
lag += endOffset.getValue();
|
||||
if (current != null) {
|
||||
lag -= current.offset();
|
||||
|
||||
@@ -397,9 +397,6 @@ public class KafkaMessageChannelBinder extends
|
||||
List<PartitionInfo> partitionsFor = producer
|
||||
.partitionsFor(destination.getName());
|
||||
producer.close();
|
||||
if (transMan == null) {
|
||||
((DisposableBean) producerFB).destroy();
|
||||
}
|
||||
return partitionsFor;
|
||||
}, destination.getName());
|
||||
this.topicsInUse.put(destination.getName(),
|
||||
@@ -1621,9 +1618,9 @@ public class KafkaMessageChannelBinder extends
|
||||
key, value, headers);
|
||||
|
||||
StringBuilder sb = new StringBuilder().append(" a message with key='")
|
||||
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50))
|
||||
.append(keyOrValue(key))
|
||||
.append("'").append(" and payload='")
|
||||
.append(toDisplayString(ObjectUtils.nullSafeToString(value), 50))
|
||||
.append(keyOrValue(value))
|
||||
.append("'").append(" received from ")
|
||||
.append(consumerRecord.partition());
|
||||
ListenableFuture<SendResult<K, V>> sentDlq = null;
|
||||
@@ -1663,9 +1660,16 @@ public class KafkaMessageChannelBinder extends
|
||||
messageHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private String keyOrValue(Object keyOrValue) {
|
||||
if (keyOrValue instanceof byte[]) {
|
||||
return "byte[" + ((byte[]) keyOrValue).length + "]";
|
||||
}
|
||||
else {
|
||||
return toDisplayString(ObjectUtils.nullSafeToString(keyOrValue), 50);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import org.junit.jupiter.api.extension.AfterEachCallback;
|
||||
import org.junit.jupiter.api.extension.BeforeEachCallback;
|
||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
*
|
||||
*/
|
||||
public class EmbeddedKafkaRuleExtension extends EmbeddedKafkaRule implements BeforeEachCallback, AfterEachCallback {
|
||||
|
||||
public EmbeddedKafkaRuleExtension(int count, boolean controlledShutdown,
|
||||
String... topics) {
|
||||
super(count, controlledShutdown, topics);
|
||||
}
|
||||
|
||||
public EmbeddedKafkaRuleExtension(int count, boolean controlledShutdown,
|
||||
int partitions, String... topics) {
|
||||
super(count, controlledShutdown, partitions, topics);
|
||||
}
|
||||
|
||||
public EmbeddedKafkaRuleExtension(int count) {
|
||||
super(count);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterEach(ExtensionContext context) throws Exception {
|
||||
this.after();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeEach(ExtensionContext context) throws Exception {
|
||||
this.before();
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016-2020 the original author or authors.
|
||||
* Copyright 2016-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -89,9 +89,12 @@ public class KafkaBinderMetricsTest {
|
||||
|
||||
@Test
|
||||
public void shouldIndicateLag() {
|
||||
final Map<TopicPartition, OffsetAndMetadata> committed = new HashMap<>();
|
||||
TopicPartition topicPartition = new TopicPartition(TEST_TOPIC, 0);
|
||||
committed.put(topicPartition, new OffsetAndMetadata(500));
|
||||
org.mockito.BDDMockito
|
||||
.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class)))
|
||||
.willReturn(new OffsetAndMetadata(500));
|
||||
.given(consumer.committed(ArgumentMatchers.anySet()))
|
||||
.willReturn(committed);
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC,
|
||||
new TopicInformation("group1-metrics", partitions, false));
|
||||
@@ -133,9 +136,14 @@ public class KafkaBinderMetricsTest {
|
||||
org.mockito.BDDMockito
|
||||
.given(consumer.endOffsets(ArgumentMatchers.anyCollection()))
|
||||
.willReturn(endOffsets);
|
||||
final Map<TopicPartition, OffsetAndMetadata> committed = new HashMap<>();
|
||||
TopicPartition topicPartition1 = new TopicPartition(TEST_TOPIC, 0);
|
||||
TopicPartition topicPartition2 = new TopicPartition(TEST_TOPIC, 1);
|
||||
committed.put(topicPartition1, new OffsetAndMetadata(500));
|
||||
committed.put(topicPartition2, new OffsetAndMetadata(500));
|
||||
org.mockito.BDDMockito
|
||||
.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class)))
|
||||
.willReturn(new OffsetAndMetadata(500));
|
||||
.given(consumer.committed(ArgumentMatchers.anySet()))
|
||||
.willReturn(committed);
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0),
|
||||
new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC,
|
||||
|
||||
@@ -66,11 +66,12 @@ import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.assertj.core.api.Condition;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInfo;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
@@ -97,6 +98,7 @@ import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
|
||||
import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor;
|
||||
import org.springframework.cloud.stream.config.BindingProperties;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
@@ -163,14 +165,14 @@ public class KafkaBinderTests extends
|
||||
// @checkstyle:on
|
||||
private static final int DEFAULT_OPERATION_TIMEOUT = 30;
|
||||
|
||||
// @RegisterExtension
|
||||
// public ExpectedException expectedProvisioningException = ExpectedException.none();
|
||||
@Rule
|
||||
public ExpectedException expectedProvisioningException = ExpectedException.none();
|
||||
|
||||
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class
|
||||
.getSimpleName();
|
||||
|
||||
@RegisterExtension
|
||||
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRuleExtension(1, true, 10,
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10,
|
||||
"error.pollableDlq.group-pcWithDlq")
|
||||
.brokerProperty("transaction.state.log.replication.factor", "1")
|
||||
.brokerProperty("transaction.state.log.min.isr", "1");
|
||||
@@ -189,12 +191,8 @@ public class KafkaBinderTests extends
|
||||
return kafkaConsumerProperties;
|
||||
}
|
||||
|
||||
private ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
|
||||
return this.createProducerProperties(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties(TestInfo testInto) {
|
||||
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
|
||||
new KafkaProducerProperties());
|
||||
producerProperties.getExtension().setSync(true);
|
||||
@@ -276,7 +274,7 @@ public class KafkaBinderTests extends
|
||||
return KafkaHeaders.OFFSET;
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
@Before
|
||||
public void init() {
|
||||
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
|
||||
if (multiplier != null) {
|
||||
@@ -554,7 +552,7 @@ public class KafkaBinderTests extends
|
||||
@Test
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSendAndReceiveNoOriginalContentType(TestInfo testInfo) throws Exception {
|
||||
public void testSendAndReceiveNoOriginalContentType() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
|
||||
BindingProperties producerBindingProperties = createProducerBindingProperties(
|
||||
@@ -604,7 +602,7 @@ public class KafkaBinderTests extends
|
||||
@Test
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSendAndReceive(TestInfo testInfo) throws Exception {
|
||||
public void testSendAndReceive() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
BindingProperties outputBindingProperties = createProducerBindingProperties(
|
||||
createProducerProperties());
|
||||
@@ -730,7 +728,7 @@ public class KafkaBinderTests extends
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
@Disabled
|
||||
@Ignore
|
||||
public void testDlqWithNativeSerializationEnabledOnDlqProducer() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
@@ -1461,15 +1459,9 @@ public class KafkaBinderTests extends
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testValidateKafkaTopicName() {
|
||||
try {
|
||||
KafkaTopicUtils.validateTopicName("foo:bar");
|
||||
fail("Expecting IllegalArgumentException");
|
||||
}
|
||||
catch (Exception e) {
|
||||
// TODO: handle exception
|
||||
}
|
||||
KafkaTopicUtils.validateTopicName("foo:bar");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -1607,7 +1599,7 @@ public class KafkaBinderTests extends
|
||||
@Test
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSendAndReceiveMultipleTopics(TestInfo testInfo) throws Exception {
|
||||
public void testSendAndReceiveMultipleTopics() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
|
||||
DirectChannel moduleOutputChannel1 = createBindableChannel("output1",
|
||||
@@ -1768,7 +1760,7 @@ public class KafkaBinderTests extends
|
||||
@Test
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testTwoRequiredGroups(TestInfo testInfo) throws Exception {
|
||||
public void testTwoRequiredGroups() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
|
||||
@@ -1818,7 +1810,7 @@ public class KafkaBinderTests extends
|
||||
@Test
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testPartitionedModuleSpEL(TestInfo testInfo) throws Exception {
|
||||
public void testPartitionedModuleSpEL() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
@@ -1941,7 +1933,7 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
|
||||
@Test
|
||||
// @Override
|
||||
@Override
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public void testPartitionedModuleJava() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
@@ -2029,7 +2021,7 @@ public class KafkaBinderTests extends
|
||||
@Test
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAnonymousGroup(TestInfo testInfo) throws Exception {
|
||||
public void testAnonymousGroup() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
BindingProperties producerBindingProperties = createProducerBindingProperties(
|
||||
createProducerProperties());
|
||||
@@ -2878,7 +2870,6 @@ public class KafkaBinderTests extends
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
@Disabled
|
||||
public void testAutoAddPartitionsDisabledFailsIfTopicUnderPartitionedAndAutoRebalanceDisabled()
|
||||
throws Throwable {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
@@ -2897,9 +2888,9 @@ public class KafkaBinderTests extends
|
||||
consumerProperties.setInstanceCount(3);
|
||||
consumerProperties.setInstanceIndex(2);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
// expectedProvisioningException.expect(ProvisioningException.class);
|
||||
// expectedProvisioningException.expectMessage(
|
||||
// "The number of expected partitions was: 3, but 1 has been found instead");
|
||||
expectedProvisioningException.expect(ProvisioningException.class);
|
||||
expectedProvisioningException.expectMessage(
|
||||
"The number of expected partitions was: 3, but 1 has been found instead");
|
||||
Binding binding = binder.bindConsumer(testTopicName, "test", output,
|
||||
consumerProperties);
|
||||
if (binding != null) {
|
||||
@@ -3498,8 +3489,7 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
@Test(expected = TopicExistsException.class)
|
||||
public void testSameTopicCannotBeProvisionedAgain() throws Throwable {
|
||||
try (AdminClient admin = AdminClient.create(
|
||||
Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
@@ -3511,7 +3501,6 @@ public class KafkaBinderTests extends
|
||||
admin.createTopics(Collections
|
||||
.singletonList(new NewTopic("fooUniqueTopic", 1, (short) 1)))
|
||||
.all().get();
|
||||
fail("Expecting TopicExistsException");
|
||||
}
|
||||
catch (Exception ex) {
|
||||
assertThat(ex.getCause() instanceof TopicExistsException).isTrue();
|
||||
|
||||
@@ -19,14 +19,12 @@ package org.springframework.cloud.stream.binder.kafka.bootstrap;
|
||||
import java.util.function.Function;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.stream.binder.kafka.EmbeddedKafkaRuleExtension;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
@@ -37,11 +35,10 @@ import static org.assertj.core.api.Assertions.assertThatCode;
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
@Disabled
|
||||
public class KafkaBinderMeterRegistryTest {
|
||||
|
||||
@RegisterExtension
|
||||
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRuleExtension(1, true, 10);
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10);
|
||||
|
||||
@Test
|
||||
public void testMetricsWithSingleBinder() {
|
||||
|
||||
Reference in New Issue
Block a user