Compare commits

...

17 Commits
main ... v3.1.4

Author SHA1 Message Date
buildmaster
593cf44477 Update SNAPSHOT to 3.1.4 2021-09-21 06:23:25 +00:00
Soby Chacko
265b881d35 Update SI Kafka to 5.4.10 Release 2021-09-20 18:38:25 -04:00
Soby Chacko
ac2ecf1a54 GH-1145: Remove destroying producer factory
Remove the un-ncessary call to destroy the producer when checking for partitions.
This way, the producer is cached and reused at the first time data is produced.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1145
2021-09-20 18:38:25 -04:00
Gary Russell
0dd86b60c1 Fix Doc Typos for KafkaBindingRebalanceListener 2021-09-20 18:38:25 -04:00
Soby Chacko
2fc335c597 Fix wrong property in docs
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1137
2021-09-20 18:38:25 -04:00
Soby Chacko
bc62f76756 Consumer/Producer prefix in Kafka Streams binder (#1131)
* Consumer/Producer prefix in Kafka Streams binder

Kafka Streams allows the applications to separate the consumer and producer
properties using consumer/producer prefixes. Add these prefixes automatically
if they are missing from the application.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1065

* Addressing PR review comments
2021-09-20 18:38:25 -04:00
Soby Chacko
e537020f37 Kafka Streams binding lifecycle changes
Start Kafka Streams bindgings only when they are not running.
Similarly, stop them only if they are running.

Without these guards in the bindings for KStream, KTable and GlobalKTable,
it may cause NPE's due to the backing concurrent collections in KafkaStreamsRegistry
not finding the proper KafkaStreams object, especially when the StreamsBuilderFactory
bean is already stopped through the binder provided manager.
2021-09-20 18:38:25 -04:00
Soby Chacko
20d823eeec GH-1112: Fix OOM in DlqSender.sendToDlq
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1112
2021-09-20 18:38:25 -04:00
Soby Chacko
0034f085a2 GH-1129: Kafka Binder Metrics Improvements
Avoid blocking committed() call in KafkaBinderMetrics in a loop for
each topic partition.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1129
2021-09-20 18:38:25 -04:00
Soby Chacko
ef8d358f1d Document Kafka Streams and Sleuth integration
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1102
2021-09-20 18:38:25 -04:00
Derek Eskens
6e2bc7bc87 Fixing consumer config typo in overview.adoc
ConsumerConfigCustomizer is the correct name of the interface.
2021-08-23 19:31:07 -04:00
Soby Chacko
389b45d6bd Recycle KafkaStreams Objects
In the event Kafka Streams bindings are restarted (stop/start)
using the actuator bindings endpoints, the underlying KafkaStreams
objects are not recycled. After restarting, it still sees the previous
KafkaStreams object. Addressing this issue.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1119
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1120
2021-08-12 16:04:38 -04:00
Soby Chacko
e6602f235e Docs updates for Kakfa Streams skipAndContinue 2021-08-12 14:44:05 -04:00
Soby Chacko
f6b2021310 Update Spring Kafka/Spring Integration Kafka
Spring Kafka - 2.6.10
Spring Integration Kafka - 5.4.8
2021-07-26 15:06:49 -04:00
Oleg Zhurakousky
6a838acca4 Bumpd s-c-build to 3.0.4-SNAPSHOT 2021-07-20 10:33:57 +02:00
Soby Chacko
cde5b7b80b GH-1085: Allow KTable binding on the outbound
At the moment, Kafka Streams binder only allows KStream bindings on the outbound.
There is a delegation mechanism in which we stil can use KStream for output binding
while allowing the applications to provide a KTable type as the function return type.

Update docs.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1085

Resolves #1105
2021-07-16 09:47:37 +02:00
Soby Chacko
2495e6e44e Revert "JUnit 5 migration related to boot 2.6"
This reverts commit 1a45ff797a.
2021-07-15 12:54:49 -04:00
27 changed files with 353 additions and 190 deletions

View File

@@ -239,7 +239,7 @@ Note that this property is only applicable for pollable consumers.
Default: not set.
resetOffsets::
Whether to reset offsets on the consumer to the value provided by startOffset.
Must be false if a `KafkaRebalanceListener` is provided; see <<rebalance-listener>>.
Must be false if a `KafkaBindingRebalanceListener` is provided; see <<rebalance-listener>>.
See <<reset-offsets>> for more information about this property.
+
Default: `false`.
@@ -337,6 +337,11 @@ Usually needed if you want to synchronize another transaction with the Kafka tra
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
+
Default: none.
txCommitRecovered::
When using a transactional binder, the offset of a recovered record (e.g. when retries are exhausted and the record is sent to a dead letter topic) will be committed via a new transaction, by default.
Setting this property to `false` suppresses committing the offset of recovered record.
+
Default: true.
[[reset-offsets]]
==== Resetting Offsets
@@ -363,7 +368,7 @@ Set `resetOffsets` to `true` and `startOffset` to `latest`; the binding will per
IMPORTANT: If a rebalance occurs after the initial assignment, the seeks will only be performed on any newly assigned partitions that were not assigned during the initial assignment.
For more control over topic offsets, see <<rebalance-listener>>; when a listener is provided, `resetOffsets: true` is ignored.
For more control over topic offsets, see <<rebalance-listener>>; when a listener is provided, `resetOffsets` should not be set to `true`, otherwise, that will cause an error.
==== Consuming Batches
@@ -506,11 +511,11 @@ Default: `false`
In this section, we show the use of the preceding properties for specific scenarios.
===== Example: Setting `autoCommitOffset` to `false` and Relying on Manual Acking
===== Example: Setting `ackMode` to `MANUAL` and Relying on Manual Acknowledgement
This example illustrates how one may manually acknowledge offsets in a consumer application.
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset` be set to `false`.
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.ackMode` be set to `MANUAL`.
Use the corresponding input channel name for your example.
[source]
@@ -622,6 +627,47 @@ Usually, applications may use principals that do not have administrative rights
Consequently, relying on Spring Cloud Stream to create/modify topics may fail.
In secure environments, we strongly recommend creating topics and managing ACLs administratively by using Kafka tooling.
====== Multi-binder configuration and JAAS
When connecting to multiple clusters in which each one requires separate JAAS configuration, then set the JAAS configuration using the property `sasl.jaas.config`.
When this property is present in the applicaiton, it takes precedence over the other strategies mentioned above.
See this https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients[KIP-85] for more details.
For example, if you have two clusters in your application with separate JAAS configuration, then the following is a template that you can use:
```
spring.cloud.stream:
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
kafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
kafka.binder:
configuration:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
```
Note that both the Kafka clusters, and the `sasl.jaas.config` values for each of them are different in the above configuration.
See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples/kafka-multi-binder-jaas[sample application] for more details on how to setup and run such an application.
[[pause-resume]]
===== Example: Pausing and Resuming the Consumer
@@ -774,10 +820,10 @@ public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
====
[[rebalance-listener]]
=== Using a KafkaRebalanceListener
=== Using a KafkaBindingRebalanceListener
Applications may wish to seek topics/partitions to arbitrary offsets when the partitions are initially assigned, or perform other operations on the consumer.
Starting with version 2.1, if you provide a single `KafkaRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.
Starting with version 2.1, if you provide a single `KafkaBindingRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.
====
[source, java]
@@ -830,7 +876,7 @@ You cannot set the `resetOffsets` consumer property to `true` when you provide a
If you want advanced customization of consumer and producer configuration that is used for creating `ConsumerFactory` and `ProducerFactory` in Kafka,
you can implement the following customizers.
* ConsusumerConfigCustomizer
* ConsumerConfigCustomizer
* ProducerConfigCustomizer
Both of these interfaces provide a way to configure the config map used for consumer and producer properties.

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.1.4</version>
</parent>
<packaging>jar</packaging>
<name>spring-cloud-stream-binder-kafka-docs</name>

View File

@@ -9,7 +9,6 @@
|spring.cloud.stream.dynamic-destinations | `[]` | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound.
|spring.cloud.stream.function.batch-mode | `false` |
|spring.cloud.stream.function.bindings | |
|spring.cloud.stream.function.definition | | Definition of functions to bind. If several functions need to be composed into one, use pipes (e.g., 'fooFunc\|barFunc')
|spring.cloud.stream.instance-count | `1` | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding.
|spring.cloud.stream.instance-index | `0` | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding.
|spring.cloud.stream.instance-index-list | | A list of instance id's from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is the name of the binding. This setting will override the one set in 'spring.cloud.stream.instance-index'

View File

@@ -252,7 +252,28 @@ The input from the three partial functions which are `KStream`, `GlobalKTable`,
Input bindings are named as `enrichOrder-in-0`, `enrichOrder-in-1` and `enrichOrder-in-2` respectively. Output binding is named as `enrichOrder-out-0`.
With curried functions, you can virtually have any number of inputs. However, keep in mind that, anything more than a smaller number of inputs and partially applied functions for them as above in Java might lead to unreadable code.
Therefore if your Kafka Streams application requires more than a reasonably smaller number of input bindings and you want to use this functional model, then you may want to rethink your design and decompose the application appropriately.
Therefore if your Kafka Streams application requires more than a reasonably smaller number of input bindings, and you want to use this functional model, then you may want to rethink your design and decompose the application appropriately.
===== Output Bindings
Kafka Streams binder allows types of either `KStream` or `KTable` as output bindings.
Behind the scenes, the binder uses the `to` method on `KStream` to send the resultant records to the output topic.
If the application provides a `KTable` as output in the function, the binder still uses this technique by delegating to the `to` method of `KStream`.
For example both functions below will work:
```
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
```
===== Multiple Output Bindings
@@ -383,8 +404,7 @@ The default output binding for this example becomes `curriedFoobar-out-0`.
====== Special note on using `KTable` as output in function composition
When using function composition, for intermediate functions, you can use `KTable` as output.
For instance, lets say you have the following two functions.
Lets say you have the following two functions.
```
@Bean
@@ -399,10 +419,7 @@ public Function<KTable<String, String>, KStream<String, String>> bar() {
}
```
You can compose them as `foo|bar` although foo's output is `KTable`.
In normal case, when you use `foo` as standalone, this will not work, as the binder does not support `KTable` as the final output.
Note that in the example above, bar's output is still a `KStream`.
We are only able to use `foo` which has a `KTable` output, since we are composing with another function that has `KStream` as its output.
You can compose them as `foo|bar`, but keep in mind that the second function (`bar` in this case) must have a `KTable` as input since the first function (`foo`) has `KTable` as output.
==== Imperative programming model.
@@ -974,7 +991,7 @@ One important thing to keep in mind when providing an implementation for `DlqDes
This is because there is no way for the binder to infer the names of all the DLQ topics the implementation might send to.
Therefore, if you provide DLQ names using this strategy, it is the application's responsibility to ensure that those topics are created beforehand.
If `DlqDestinationResolver` is present in the application as a bean, that takes higher prcedence.
If `DlqDestinationResolver` is present in the application as a bean, that takes higher precedence.
If you do not want to follow this approach and rather provide a static DLQ name using configuration, you can set the following property.
[source]
@@ -1002,10 +1019,10 @@ public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String,
}
```
and you only want to enable DLQ on the first input binding and logAndSkip on the second binding, then you can do so on the consumer as below.
and you only want to enable DLQ on the first input binding and skipAndContinue on the second binding, then you can do so on the consumer as below.
`spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq`
`spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: logAndSkip`
`spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue`
Setting deserialization exception handlers this way has a higher precedence than setting at the binder level.
@@ -1853,6 +1870,69 @@ When there are multiple bindings present on a single function, invoking these op
This is because all the bindings on a single function are backed by the same `StreamsBuilderFactoryBean`.
Therefore, for the function above, either `function-in-0` or `function-out-0` will work.
=== Tracing using Spring Cloud Sleuth
When Spring Cloud Sleuth is on the classpath of a Spring Cloud Stream Kafka Streams binder based application, both its consumer and producer are automatically instrumented with tracing information.
However, in order to trace any application specific operations, those need to be explicitly instrumented by the user code.
This can be done by injecting the `KafkaStreamsTracing` bean from Spring Cloud Sleuth in the application and then invoke various Kafka Streams operations through this injected bean.
Here are some examples of using it.
```
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
return new KeyValue<>(value.getRegion(),
value.getClicks());
}))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
.toStream());
}
```
In the example above, there are two places where it adds explicit tracing instrumentation.
First, we are logging the key/value information from the incoming `KStream`.
When this information is logged, the associated span and trace IDs get logged as well so that a monitoring system can track them and correlate with the same span id.
Second, when we call a `map` operation, instead of calling it directly on the `KStream` class, we wrap it inside a `transform` operation and then call `map` from `KafkaStreamsTracing`.
In this case also, the logged message will contain the span ID and trace ID.
Here is another example, where we use the low-level transformer API for accessing the various Kafka Streams headers.
When spring-cloud-sleuth is on the classpath, all the tracing headers can also be accessed like this.
```
@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
return input -> input.transform(kafkaStreamsTracing.transformer(
"transformer-1",
() -> new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, String> transform(String key, String value) {
LOG.info("Headers: " + this.context.headers());
LOG.info("K/V:" + key + "/" + value);
// More transformations, business logic execution, etc. go here.
return KeyValue.pair(key, value);
}
@Override
public void close() {
}
}));
}
```
=== Configuration Options
This section contains the configuration options used by the Kafka Streams binder.
@@ -1906,7 +1986,7 @@ deserializationExceptionHandler::
Deserialization error handler type.
This handler is applied at the binder level and thus applied against all input binding in the application.
There is a way to control it in a more fine-grained way at the consumer binding level.
Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq`
Possible values are - `logAndContinue`, `logAndFail`, `skipAndContinue` or `sendToDlq`
+
Default: `logAndFail`
@@ -2013,7 +2093,7 @@ Unlike the message channel based binder, Kafka Streams binder does not seek to b
deserializationExceptionHandler::
Deserialization error handler type.
This handler is applied per consumer binding as opposed to the binder level property described before.
Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq`
Possible values are - `logAndContinue`, `logAndFail`, `skipAndContinue` or `sendToDlq`
+
Default: `logAndFail`

View File

@@ -218,7 +218,7 @@ Note that this property is only applicable for pollable consumers.
Default: not set.
resetOffsets::
Whether to reset offsets on the consumer to the value provided by startOffset.
Must be false if a `KafkaRebalanceListener` is provided; see <<rebalance-listener>>.
Must be false if a `KafkaBindingRebalanceListener` is provided; see <<rebalance-listener>>.
See <<reset-offsets>> for more information about this property.
+
Default: `false`.
@@ -490,11 +490,11 @@ Default: `false`
In this section, we show the use of the preceding properties for specific scenarios.
===== Example: Setting `autoCommitOffset` to `false` and Relying on Manual Acking
===== Example: Setting `ackMode` to `MANUAL` and Relying on Manual Acknowledgement
This example illustrates how one may manually acknowledge offsets in a consumer application.
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset` be set to `false`.
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.ackMode` be set to `MANUAL`.
Use the corresponding input channel name for your example.
[source]
@@ -799,10 +799,10 @@ public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
====
[[rebalance-listener]]
=== Using a KafkaRebalanceListener
=== Using a KafkaBindingRebalanceListener
Applications may wish to seek topics/partitions to arbitrary offsets when the partitions are initially assigned, or perform other operations on the consumer.
Starting with version 2.1, if you provide a single `KafkaRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.
Starting with version 2.1, if you provide a single `KafkaBindingRebalanceListener` bean in the application context, it will be wired into all Kafka consumer bindings.
====
[source, java]
@@ -855,7 +855,7 @@ You cannot set the `resetOffsets` consumer property to `true` when you provide a
If you want advanced customization of consumer and producer configuration that is used for creating `ConsumerFactory` and `ProducerFactory` in Kafka,
you can implement the following customizers.
* ConsusumerConfigCustomizer
* ConsumerConfigCustomizer
* ProducerConfigCustomizer
Both of these interfaces provide a way to configure the config map used for consumer and producer properties.

12
pom.xml
View File

@@ -2,21 +2,21 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.1.4</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>3.0.3</version>
<version>3.0.4</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.6.8</spring-kafka.version>
<spring-integration-kafka.version>5.4.7</spring-integration-kafka.version>
<spring-kafka.version>2.6.10</spring-kafka.version>
<spring-integration-kafka.version>5.4.10</spring-integration-kafka.version>
<kafka.version>2.6.2</kafka.version>
<spring-cloud-schema-registry.version>1.1.4-SNAPSHOT</spring-cloud-schema-registry.version>
<spring-cloud-stream.version>3.1.4-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-schema-registry.version>1.1.4</spring-cloud-schema-registry.version>
<spring-cloud-stream.version>3.1.4</spring-cloud-stream.version>
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.1.4</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.1.4</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.1.4</version>
</parent>
<properties>

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.springframework.cloud.stream.binder.AbstractBinder;
@@ -58,16 +59,18 @@ public class GlobalKTableBinder extends
// @checkstyle:off
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = new KafkaStreamsExtendedBindingProperties();
private final KafkaStreamsRegistry kafkaStreamsRegistry;
// @checkstyle:on
public GlobalKTableBinder(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsRegistry kafkaStreamsRegistry) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
}
@Override
@@ -97,10 +100,22 @@ public class GlobalKTableBinder extends
return true;
}
@Override
public synchronized void start() {
if (!streamsBuilderFactoryBean.isRunning()) {
super.start();
GlobalKTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
}
@Override
public synchronized void stop() {
super.stop();
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
if (streamsBuilderFactoryBean.isRunning()) {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop();
GlobalKTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
}
}
};
}

View File

@@ -59,10 +59,11 @@ public class GlobalKTableBinderConfiguration {
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties) {
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties,
KafkaStreamsRegistry kafkaStreamsRegistry) {
GlobalKTableBinder globalKTableBinder = new GlobalKTableBinder(binderConfigurationProperties,
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue);
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue, kafkaStreamsRegistry);
globalKTableBinder.setKafkaStreamsExtendedBindingProperties(
kafkaStreamsExtendedBindingProperties);
return globalKTableBinder;

View File

@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -78,16 +79,19 @@ class KStreamBinder extends
private final KeyValueSerdeResolver keyValueSerdeResolver;
private final KafkaStreamsRegistry kafkaStreamsRegistry;
KStreamBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver) {
KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsRegistry kafkaStreamsRegistry) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
this.keyValueSerdeResolver = keyValueSerdeResolver;
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
}
@Override
@@ -125,10 +129,22 @@ class KStreamBinder extends
return true;
}
@Override
public synchronized void start() {
if (!streamsBuilderFactoryBean.isRunning()) {
super.start();
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
}
@Override
public synchronized void stop() {
super.stop();
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
if (streamsBuilderFactoryBean.isRunning()) {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop();
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
}
}
};
}
@@ -178,10 +194,22 @@ class KStreamBinder extends
return false;
}
@Override
public synchronized void start() {
if (!streamsBuilderFactoryBean.isRunning()) {
super.start();
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
}
@Override
public synchronized void stop() {
super.stop();
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
if (streamsBuilderFactoryBean.isRunning()) {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop();
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
}
}
};
}

View File

@@ -59,10 +59,10 @@ public class KStreamBinderConfiguration {
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KafkaStreamsRegistry kafkaStreamsRegistry) {
KStreamBinder kStreamBinder = new KStreamBinder(binderConfigurationProperties,
kafkaTopicProvisioner, KafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue, keyValueSerdeResolver);
KafkaStreamsBindingInformationCatalogue, keyValueSerdeResolver, kafkaStreamsRegistry);
kStreamBinder.setKafkaStreamsExtendedBindingProperties(
kafkaStreamsExtendedBindingProperties);
return kStreamBinder;

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.cloud.stream.binder.AbstractBinder;
@@ -44,13 +45,10 @@ import org.springframework.util.StringUtils;
* @author Soby Chacko
*/
class KTableBinder extends
// @checkstyle:off
AbstractBinder<KTable<Object, Object>, ExtendedConsumerProperties<KafkaStreamsConsumerProperties>, ExtendedProducerProperties<KafkaStreamsProducerProperties>>
implements
ExtendedPropertiesBinder<KTable<Object, Object>, KafkaStreamsConsumerProperties, KafkaStreamsProducerProperties> {
// @checkstyle:on
private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
private final KafkaTopicProvisioner kafkaTopicProvisioner;
@@ -62,12 +60,15 @@ class KTableBinder extends
// @checkstyle:on
private final KafkaStreamsRegistry kafkaStreamsRegistry;
KTableBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue) {
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue, KafkaStreamsRegistry kafkaStreamsRegistry) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
}
@Override
@@ -100,10 +101,22 @@ class KTableBinder extends
return true;
}
@Override
public synchronized void start() {
if (!streamsBuilderFactoryBean.isRunning()) {
super.start();
KTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
}
@Override
public synchronized void stop() {
super.stop();
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
if (streamsBuilderFactoryBean.isRunning()) {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop();
KTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
}
}
};
}
@@ -111,9 +124,7 @@ class KTableBinder extends
@Override
protected Binding<KTable<Object, Object>> doBindProducer(String name,
KTable<Object, Object> outboundBindTarget,
// @checkstyle:off
ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
// @checkstyle:on
throw new UnsupportedOperationException(
"No producer level binding is allowed for KTable");
}

View File

@@ -59,9 +59,10 @@ public class KTableBinderConfiguration {
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties) {
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties,
KafkaStreamsRegistry kafkaStreamsRegistry) {
KTableBinder kTableBinder = new KTableBinder(binderConfigurationProperties,
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue);
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue, kafkaStreamsRegistry);
kTableBinder.setKafkaStreamsExtendedBindingProperties(kafkaStreamsExtendedBindingProperties);
return kTableBinder;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -98,6 +98,9 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
private static final String GLOBALKTABLE_BINDER_TYPE = "globalktable";
private static final String CONSUMER_PROPERTIES_PREFIX = "consumer.";
private static final String PRODUCER_PROPERTIES_PREFIX = "producer.";
@Bean
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder")
public KafkaStreamsBinderConfigurationProperties binderConfigurationProperties(
@@ -266,14 +269,15 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
if (!ObjectUtils.isEmpty(configProperties.getConfiguration())) {
properties.putAll(configProperties.getConfiguration());
}
Map<String, Object> mergedConsumerConfig = configProperties.mergedConsumerConfiguration();
if (!ObjectUtils.isEmpty(mergedConsumerConfig)) {
properties.putAll(mergedConsumerConfig);
}
Map<String, Object> mergedProducerConfig = configProperties.mergedProducerConfiguration();
if (!ObjectUtils.isEmpty(mergedProducerConfig)) {
properties.putAll(mergedProducerConfig);
}
Map<String, Object> mergedConsumerConfig = new HashMap<>(configProperties.mergedConsumerConfiguration());
//Adding consumer. prefix if they are missing (in order to differentiate them from other property categories such as stream, producer etc.)
addPrefix(properties, mergedConsumerConfig, CONSUMER_PROPERTIES_PREFIX);
Map<String, Object> mergedProducerConfig = new HashMap<>(configProperties.mergedProducerConfiguration());
//Adding producer. prefix if they are missing (in order to differentiate them from other property categories such as stream, consumer etc.)
addPrefix(properties, mergedProducerConfig, PRODUCER_PROPERTIES_PREFIX);
if (!properties.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG)) {
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,
(int) configProperties.getReplicationFactor());
@@ -282,6 +286,16 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
Collectors.toMap((e) -> String.valueOf(e.getKey()), Map.Entry::getValue));
}
private void addPrefix(Properties properties, Map<String, Object> mergedConsProdConfig, String prefix) {
Map<String, Object> mergedConfigs = new HashMap<>();
for (String key : mergedConsProdConfig.keySet()) {
mergedConfigs.put(key.startsWith(prefix) ? key : prefix + key, mergedConsProdConfig.get(key));
}
if (!ObjectUtils.isEmpty(mergedConfigs)) {
properties.putAll(mergedConfigs);
}
}
@Bean
public KStreamStreamListenerResultAdapter kstreamStreamListenerResultAdapter() {
return new KStreamStreamListenerResultAdapter();

View File

@@ -39,6 +39,7 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
@@ -298,7 +299,13 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
outboundResolvableType, (Object[]) result, streamsBuilderFactoryBean);
}
else {
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType, (KStream) result, outboundDefinitionIterator);
if (KTable.class.isAssignableFrom(result.getClass())) {
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
outboundResolvableType : resolvableType.getGeneric(1), ((KTable) result).toStream(), outboundDefinitionIterator);
}
else {
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType, (KStream) result, outboundDefinitionIterator);
}
}
}
}
@@ -337,8 +344,14 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
outboundResolvableType, (Object[]) result, streamsBuilderFactoryBean);
}
else {
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
outboundResolvableType : resolvableType.getGeneric(1), (KStream) result, outboundDefinitionIterator);
if (KTable.class.isAssignableFrom(result.getClass())) {
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
outboundResolvableType : resolvableType.getGeneric(1), ((KTable) result).toStream(), outboundDefinitionIterator);
}
else {
handleSingleKStreamOutbound(resolvableTypes, outboundResolvableType != null ?
outboundResolvableType : resolvableType.getGeneric(1), (KStream) result, outboundDefinitionIterator);
}
}
}
}

View File

@@ -62,6 +62,11 @@ public class KafkaStreamsRegistry {
this.streamsBuilderFactoryBeanMap.put(kafkaStreams, streamsBuilderFactoryBean);
}
void unregisterKafkaStreams(KafkaStreams kafkaStreams) {
this.kafkaStreams.remove(kafkaStreams);
this.streamsBuilderFactoryBeanMap.remove(kafkaStreams);
}
/**
*
* @param kafkaStreams {@link KafkaStreams} object

View File

@@ -145,7 +145,8 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
}
if (outboundArgument != null && outboundArgument.getRawClass() != null && (!outboundArgument.isArray() &&
outboundArgument.getRawClass().isAssignableFrom(KStream.class))) {
(outboundArgument.getRawClass().isAssignableFrom(KStream.class) ||
outboundArgument.getRawClass().isAssignableFrom(KTable.class)))) { //Allowing both KStream and KTable on the outbound.
// if the type is array, we need to do a late binding as we don't know the number of
// output bindings at this point in the flow.
@@ -157,12 +158,15 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
if (outputBindingsIter.hasNext()) {
outputBinding = outputBindingsIter.next();
}
}
else {
outputBinding = String.format("%s-%s-0", this.functionName, FunctionConstants.DEFAULT_OUTPUT_SUFFIX);
}
Assert.isTrue(outputBinding != null, "output binding is not inferred.");
// We will only allow KStream targets on the outbound. If the user provides a KTable,
// we still use the KStreamBinder to send it through the outbound.
// In that case before sending, we do a cast from KTable to KStream.
// See KafkaStreamsFunctionsProcessor#setupFunctionInvokerForKafkaStreams for details.
KafkaStreamsBindableProxyFactory.this.outputHolders.put(outputBinding,
new BoundTargetHolder(getBindingTargetFactory(KStream.class)
.createOutput(outputBinding), true));

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2019 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -111,6 +111,7 @@ public class KafkaStreamsBinderWordCountFunctionTests {
"--spring.cloud.stream.kafka.streams.binder.application-id=testKstreamWordCountFunction",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.consumerProperties.request.timeout.ms=29000", //for testing ...binder.consumerProperties
"--spring.cloud.stream.kafka.streams.binder.consumerProperties.consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
"--spring.cloud.stream.kafka.streams.binder.producerProperties.max.block.ms=90000", //for testing ...binder.producerProperties
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
@@ -143,8 +144,9 @@ public class KafkaStreamsBinderWordCountFunctionTests {
//verify that ...binder.consumerProperties and ...binder.producerProperties work.
Map<String, Object> streamConfigGlobalProperties = (Map<String, Object>) context.getBean("streamConfigGlobalProperties");
assertThat(streamConfigGlobalProperties.get("request.timeout.ms")).isEqualTo("29000");
assertThat(streamConfigGlobalProperties.get("max.block.ms")).isEqualTo("90000");
assertThat(streamConfigGlobalProperties.get("consumer.request.timeout.ms")).isEqualTo("29000");
assertThat(streamConfigGlobalProperties.get("consumer.value.deserializer")).isEqualTo("org.apache.kafka.common.serialization.StringDeserializer");
assertThat(streamConfigGlobalProperties.get("producer.max.block.ms")).isEqualTo("90000");
InputBindingLifecycle inputBindingLifecycle = context.getBean(InputBindingLifecycle.class);
final Collection<Binding<Object>> inputBindings = (Collection<Binding<Object>>) new DirectFieldAccessor(inputBindingLifecycle)

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.4-SNAPSHOT</version>
<version>3.1.4</version>
</parent>
<dependencies>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -208,10 +208,11 @@ public class KafkaBinderMetrics
Map<TopicPartition, Long> endOffsets = metadataConsumer
.endOffsets(topicPartitions);
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = metadataConsumer.committed(endOffsets.keySet());
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets
.entrySet()) {
OffsetAndMetadata current = metadataConsumer
.committed(endOffset.getKey());
OffsetAndMetadata current = committedOffsets.get(endOffset.getKey());
lag += endOffset.getValue();
if (current != null) {
lag -= current.offset();

View File

@@ -397,9 +397,6 @@ public class KafkaMessageChannelBinder extends
List<PartitionInfo> partitionsFor = producer
.partitionsFor(destination.getName());
producer.close();
if (transMan == null) {
((DisposableBean) producerFB).destroy();
}
return partitionsFor;
}, destination.getName());
this.topicsInUse.put(destination.getName(),
@@ -1621,9 +1618,9 @@ public class KafkaMessageChannelBinder extends
key, value, headers);
StringBuilder sb = new StringBuilder().append(" a message with key='")
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50))
.append(keyOrValue(key))
.append("'").append(" and payload='")
.append(toDisplayString(ObjectUtils.nullSafeToString(value), 50))
.append(keyOrValue(value))
.append("'").append(" received from ")
.append(consumerRecord.partition());
ListenableFuture<SendResult<K, V>> sentDlq = null;
@@ -1663,9 +1660,16 @@ public class KafkaMessageChannelBinder extends
messageHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge();
}
}
}
private String keyOrValue(Object keyOrValue) {
if (keyOrValue instanceof byte[]) {
return "byte[" + ((byte[]) keyOrValue).length + "]";
}
else {
return toDisplayString(ObjectUtils.nullSafeToString(keyOrValue), 50);
}
}
}
}

View File

@@ -1,55 +0,0 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
/**
*
* @author Oleg Zhurakousky
*
*/
public class EmbeddedKafkaRuleExtension extends EmbeddedKafkaRule implements BeforeEachCallback, AfterEachCallback {
public EmbeddedKafkaRuleExtension(int count, boolean controlledShutdown,
String... topics) {
super(count, controlledShutdown, topics);
}
public EmbeddedKafkaRuleExtension(int count, boolean controlledShutdown,
int partitions, String... topics) {
super(count, controlledShutdown, partitions, topics);
}
public EmbeddedKafkaRuleExtension(int count) {
super(count);
}
@Override
public void afterEach(ExtensionContext context) throws Exception {
this.after();
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
this.before();
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -89,9 +89,12 @@ public class KafkaBinderMetricsTest {
@Test
public void shouldIndicateLag() {
final Map<TopicPartition, OffsetAndMetadata> committed = new HashMap<>();
TopicPartition topicPartition = new TopicPartition(TEST_TOPIC, 0);
committed.put(topicPartition, new OffsetAndMetadata(500));
org.mockito.BDDMockito
.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class)))
.willReturn(new OffsetAndMetadata(500));
.given(consumer.committed(ArgumentMatchers.anySet()))
.willReturn(committed);
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC,
new TopicInformation("group1-metrics", partitions, false));
@@ -133,9 +136,14 @@ public class KafkaBinderMetricsTest {
org.mockito.BDDMockito
.given(consumer.endOffsets(ArgumentMatchers.anyCollection()))
.willReturn(endOffsets);
final Map<TopicPartition, OffsetAndMetadata> committed = new HashMap<>();
TopicPartition topicPartition1 = new TopicPartition(TEST_TOPIC, 0);
TopicPartition topicPartition2 = new TopicPartition(TEST_TOPIC, 1);
committed.put(topicPartition1, new OffsetAndMetadata(500));
committed.put(topicPartition2, new OffsetAndMetadata(500));
org.mockito.BDDMockito
.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class)))
.willReturn(new OffsetAndMetadata(500));
.given(consumer.committed(ArgumentMatchers.anySet()))
.willReturn(committed);
List<PartitionInfo> partitions = partitions(new Node(0, null, 0),
new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC,

View File

@@ -66,11 +66,12 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.cloud.stream.binder.Binder;
@@ -97,6 +98,7 @@ import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
@@ -163,14 +165,14 @@ public class KafkaBinderTests extends
// @checkstyle:on
private static final int DEFAULT_OPERATION_TIMEOUT = 30;
// @RegisterExtension
// public ExpectedException expectedProvisioningException = ExpectedException.none();
@Rule
public ExpectedException expectedProvisioningException = ExpectedException.none();
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class
.getSimpleName();
@RegisterExtension
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRuleExtension(1, true, 10,
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10,
"error.pollableDlq.group-pcWithDlq")
.brokerProperty("transaction.state.log.replication.factor", "1")
.brokerProperty("transaction.state.log.min.isr", "1");
@@ -189,12 +191,8 @@ public class KafkaBinderTests extends
return kafkaConsumerProperties;
}
private ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
return this.createProducerProperties(null);
}
@Override
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties(TestInfo testInto) {
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
producerProperties.getExtension().setSync(true);
@@ -276,7 +274,7 @@ public class KafkaBinderTests extends
return KafkaHeaders.OFFSET;
}
@BeforeEach
@Before
public void init() {
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
if (multiplier != null) {
@@ -554,7 +552,7 @@ public class KafkaBinderTests extends
@Test
@Override
@SuppressWarnings("unchecked")
public void testSendAndReceiveNoOriginalContentType(TestInfo testInfo) throws Exception {
public void testSendAndReceiveNoOriginalContentType() throws Exception {
Binder binder = getBinder();
BindingProperties producerBindingProperties = createProducerBindingProperties(
@@ -604,7 +602,7 @@ public class KafkaBinderTests extends
@Test
@Override
@SuppressWarnings("unchecked")
public void testSendAndReceive(TestInfo testInfo) throws Exception {
public void testSendAndReceive() throws Exception {
Binder binder = getBinder();
BindingProperties outputBindingProperties = createProducerBindingProperties(
createProducerProperties());
@@ -730,7 +728,7 @@ public class KafkaBinderTests extends
@Test
@SuppressWarnings("unchecked")
@Disabled
@Ignore
public void testDlqWithNativeSerializationEnabledOnDlqProducer() throws Exception {
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
@@ -1461,15 +1459,9 @@ public class KafkaBinderTests extends
producerBinding.unbind();
}
@Test
@Test(expected = IllegalArgumentException.class)
public void testValidateKafkaTopicName() {
try {
KafkaTopicUtils.validateTopicName("foo:bar");
fail("Expecting IllegalArgumentException");
}
catch (Exception e) {
// TODO: handle exception
}
KafkaTopicUtils.validateTopicName("foo:bar");
}
@Test
@@ -1607,7 +1599,7 @@ public class KafkaBinderTests extends
@Test
@Override
@SuppressWarnings("unchecked")
public void testSendAndReceiveMultipleTopics(TestInfo testInfo) throws Exception {
public void testSendAndReceiveMultipleTopics() throws Exception {
Binder binder = getBinder();
DirectChannel moduleOutputChannel1 = createBindableChannel("output1",
@@ -1768,7 +1760,7 @@ public class KafkaBinderTests extends
@Test
@Override
@SuppressWarnings("unchecked")
public void testTwoRequiredGroups(TestInfo testInfo) throws Exception {
public void testTwoRequiredGroups() throws Exception {
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
@@ -1818,7 +1810,7 @@ public class KafkaBinderTests extends
@Test
@Override
@SuppressWarnings("unchecked")
public void testPartitionedModuleSpEL(TestInfo testInfo) throws Exception {
public void testPartitionedModuleSpEL() throws Exception {
Binder binder = getBinder();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
@@ -1941,7 +1933,7 @@ public class KafkaBinderTests extends
}
@Test
// @Override
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testPartitionedModuleJava() throws Exception {
Binder binder = getBinder();
@@ -2029,7 +2021,7 @@ public class KafkaBinderTests extends
@Test
@Override
@SuppressWarnings("unchecked")
public void testAnonymousGroup(TestInfo testInfo) throws Exception {
public void testAnonymousGroup() throws Exception {
Binder binder = getBinder();
BindingProperties producerBindingProperties = createProducerBindingProperties(
createProducerProperties());
@@ -2878,7 +2870,6 @@ public class KafkaBinderTests extends
@Test
@SuppressWarnings("unchecked")
@Disabled
public void testAutoAddPartitionsDisabledFailsIfTopicUnderPartitionedAndAutoRebalanceDisabled()
throws Throwable {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
@@ -2897,9 +2888,9 @@ public class KafkaBinderTests extends
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(2);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
// expectedProvisioningException.expect(ProvisioningException.class);
// expectedProvisioningException.expectMessage(
// "The number of expected partitions was: 3, but 1 has been found instead");
expectedProvisioningException.expect(ProvisioningException.class);
expectedProvisioningException.expectMessage(
"The number of expected partitions was: 3, but 1 has been found instead");
Binding binding = binder.bindConsumer(testTopicName, "test", output,
consumerProperties);
if (binding != null) {
@@ -3498,8 +3489,7 @@ public class KafkaBinderTests extends
}
}
@Test
@Disabled
@Test(expected = TopicExistsException.class)
public void testSameTopicCannotBeProvisionedAgain() throws Throwable {
try (AdminClient admin = AdminClient.create(
Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
@@ -3511,7 +3501,6 @@ public class KafkaBinderTests extends
admin.createTopics(Collections
.singletonList(new NewTopic("fooUniqueTopic", 1, (short) 1)))
.all().get();
fail("Expecting TopicExistsException");
}
catch (Exception ex) {
assertThat(ex.getCause() instanceof TopicExistsException).isTrue();

View File

@@ -19,14 +19,12 @@ package org.springframework.cloud.stream.binder.kafka.bootstrap;
import java.util.function.Function;
import io.micrometer.core.instrument.MeterRegistry;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.kafka.EmbeddedKafkaRuleExtension;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
@@ -37,11 +35,10 @@ import static org.assertj.core.api.Assertions.assertThatCode;
/**
* @author Soby Chacko
*/
@Disabled
public class KafkaBinderMeterRegistryTest {
@RegisterExtension
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRuleExtension(1, true, 10);
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10);
@Test
public void testMetricsWithSingleBinder() {