Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e7487b9ada | ||
|
|
064f5b6611 | ||
|
|
ff859c5859 | ||
|
|
445eabc59a | ||
|
|
cc5d1b1aa6 | ||
|
|
db896532e6 | ||
|
|
07a740a5b5 | ||
|
|
65f8cc5660 | ||
|
|
90a47a675f | ||
|
|
9d212024f8 | ||
|
|
d594bab4cf | ||
|
|
e46bd1f844 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -23,3 +23,4 @@ _site/
|
||||
dump.rdb
|
||||
.apt_generated
|
||||
artifacts
|
||||
.sts4-cache
|
||||
|
||||
51
.mvn/wrapper/MavenWrapperDownloader.java
vendored
Executable file → Normal file
51
.mvn/wrapper/MavenWrapperDownloader.java
vendored
Executable file → Normal file
@@ -1,22 +1,18 @@
|
||||
/*
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
https://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
*/
|
||||
|
||||
* Copyright 2007-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import java.net.*;
|
||||
import java.io.*;
|
||||
import java.nio.channels.*;
|
||||
@@ -24,11 +20,12 @@ import java.util.Properties;
|
||||
|
||||
public class MavenWrapperDownloader {
|
||||
|
||||
private static final String WRAPPER_VERSION = "0.5.6";
|
||||
/**
|
||||
* Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
|
||||
*/
|
||||
private static final String DEFAULT_DOWNLOAD_URL =
|
||||
"https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar";
|
||||
private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/"
|
||||
+ WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar";
|
||||
|
||||
/**
|
||||
* Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
|
||||
@@ -76,13 +73,13 @@ public class MavenWrapperDownloader {
|
||||
}
|
||||
}
|
||||
}
|
||||
System.out.println("- Downloading from: : " + url);
|
||||
System.out.println("- Downloading from: " + url);
|
||||
|
||||
File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH);
|
||||
if(!outputFile.getParentFile().exists()) {
|
||||
if(!outputFile.getParentFile().mkdirs()) {
|
||||
System.out.println(
|
||||
"- ERROR creating output direcrory '" + outputFile.getParentFile().getAbsolutePath() + "'");
|
||||
"- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'");
|
||||
}
|
||||
}
|
||||
System.out.println("- Downloading to: " + outputFile.getAbsolutePath());
|
||||
@@ -98,6 +95,16 @@ public class MavenWrapperDownloader {
|
||||
}
|
||||
|
||||
private static void downloadFileFromURL(String urlString, File destination) throws Exception {
|
||||
if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) {
|
||||
String username = System.getenv("MVNW_USERNAME");
|
||||
char[] password = System.getenv("MVNW_PASSWORD").toCharArray();
|
||||
Authenticator.setDefault(new Authenticator() {
|
||||
@Override
|
||||
protected PasswordAuthentication getPasswordAuthentication() {
|
||||
return new PasswordAuthentication(username, password);
|
||||
}
|
||||
});
|
||||
}
|
||||
URL website = new URL(urlString);
|
||||
ReadableByteChannel rbc;
|
||||
rbc = Channels.newChannel(website.openStream());
|
||||
|
||||
BIN
.mvn/wrapper/maven-wrapper.jar
vendored
Executable file → Normal file
BIN
.mvn/wrapper/maven-wrapper.jar
vendored
Executable file → Normal file
Binary file not shown.
3
.mvn/wrapper/maven-wrapper.properties
vendored
Executable file → Normal file
3
.mvn/wrapper/maven-wrapper.properties
vendored
Executable file → Normal file
@@ -1 +1,2 @@
|
||||
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.5.4/apache-maven-3.5.4-bin.zip
|
||||
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip
|
||||
wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar
|
||||
|
||||
55
README.adoc
55
README.adoc
@@ -39,7 +39,7 @@ To use Apache Kafka binder, you need to add `spring-cloud-stream-binder-kafka` a
|
||||
</dependency>
|
||||
----
|
||||
|
||||
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown inn the following example for Maven:
|
||||
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown in the following example for Maven:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
@@ -60,7 +60,7 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka
|
||||
The consumer group maps directly to the same Apache Kafka concept.
|
||||
Partitioning also maps directly to Apache Kafka partitions as well.
|
||||
|
||||
The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker of at least that version.
|
||||
The binder currently uses the Apache Kafka `kafka-clients` version `2.3.1`.
|
||||
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
|
||||
For example, with versions earlier than 0.11.x.x, native headers are not supported.
|
||||
Also, 0.11.x.x does not support the `autoAddPartitions` property.
|
||||
@@ -155,21 +155,15 @@ Default: See individual producer properties.
|
||||
|
||||
spring.cloud.stream.kafka.binder.headerMapperBeanName::
|
||||
The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to and from Kafka headers.
|
||||
Use this, for example, if you wish to customize the trusted packages in a `DefaultKafkaHeaderMapper` that uses JSON deserialization for the headers.
|
||||
Use this, for example, if you wish to customize the trusted packages in a `BinderHeaderMapper` bean that uses JSON deserialization for the headers.
|
||||
If this custom `BinderHeaderMapper` bean is not made available to the binder using this property, then the binder will look for a header mapper bean with the name `kafkaBinderHeaderMapper` that is of type `BinderHeaderMapper` before falling back to a default `BinderHeaderMapper` created by the binder.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
spring.cloud.stream.kafka.binder.authorizationExceptionRetryInterval::
|
||||
Enables retrying in case of authorization exceptions.
|
||||
Defines interval between each retry.
|
||||
Accepts `Duration`, e.g. `30s`, `2m`, etc.
|
||||
+
|
||||
Default: `null` (retries disabled, fail fast)
|
||||
|
||||
[[kafka-consumer-properties]]
|
||||
==== Kafka Consumer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.consumer.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka consumers only and
|
||||
@@ -234,9 +228,20 @@ The DLQ topic name can be configurable by setting the `dlqName` property.
|
||||
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
|
||||
See <<kafka-dlq-processing>> processing for more information.
|
||||
Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message`, and `x-exception-stacktrace` as `byte[]`.
|
||||
By default, a failed record is sent to the same partition number in the DLQ topic as the original record.
|
||||
See <<dlq-partition-selection>> for how to change that behavior.
|
||||
**Not allowed when `destinationIsPattern` is `true`.**
|
||||
+
|
||||
Default: `false`.
|
||||
dlqPartitions::
|
||||
When `enableDlq` is true, and this property is not set, a dead letter topic with the same number of partitions as the primary topic(s) is created.
|
||||
Usually, dead-letter records are sent to the same partition in the dead-letter topic as the original record.
|
||||
This behavior can be changed; see <<dlq-partition-selection>>.
|
||||
If this property is set to `1` and there is no `DqlPartitionFunction` bean, all dead-letter records will be written to partition `0`.
|
||||
If this property is greater than `1`, you **MUST** provide a `DlqPartitionFunction` bean.
|
||||
Note that the actual partition count is affected by the binder's `minPartitionCount` property.
|
||||
+
|
||||
Default: `none`
|
||||
configuration::
|
||||
Map with a key/value pair containing generic Kafka consumer properties.
|
||||
In addition to having Kafka consumer properties, other configuration properties can be passed here.
|
||||
@@ -296,6 +301,12 @@ pollTimeout::
|
||||
Timeout used for polling in pollable consumers.
|
||||
+
|
||||
Default: 5 seconds.
|
||||
transactionManager::
|
||||
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
|
||||
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
|
||||
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
==== Consuming Batches
|
||||
|
||||
@@ -311,7 +322,7 @@ Refer to the https://docs.spring.io/spring-kafka/docs/2.3.0.BUILD-SNAPSHOT/refer
|
||||
[[kafka-producer-properties]]
|
||||
==== Kafka Producer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.producer.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka producers only and
|
||||
@@ -407,6 +418,12 @@ Supported values are `none`, `gzip`, `snappy` and `lz4`.
|
||||
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
|
||||
+
|
||||
Default: `none`.
|
||||
transactionManager::
|
||||
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
|
||||
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
|
||||
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
==== Usage examples
|
||||
|
||||
@@ -576,16 +593,24 @@ When used in a processor application, the consumer starts the transaction; any r
|
||||
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
|
||||
A common producer factory is used for all producer bindings configured using `spring.cloud.stream.kafka.binder.transaction.producer.*` properties; individual binding Kafka producer properties are ignored.
|
||||
|
||||
IMPORTANT: Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too.
|
||||
When retries are enabled (the common property `maxAttempts` is greater than zero) the retry properties are used to configure a `DefaultAfterRollbackProcessor` to enable retries at the container level.
|
||||
Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the `DefaultAfterRollbackProcessor` which runs after the main transaction has rolled back.
|
||||
|
||||
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. `@Scheduled` method), you must get a reference to the transactional producer factory and define a `KafkaTransactionManager` bean using it.
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
@Bean
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders) {
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders,
|
||||
@Value("${unique.tx.id.per.instance}") String txId) {
|
||||
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
return new KafkaTransactionManager<>(pf);
|
||||
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
|
||||
tm.setTransactionId(txId)
|
||||
return tm;
|
||||
}
|
||||
----
|
||||
====
|
||||
@@ -612,6 +637,8 @@ public static class Sender {
|
||||
|
||||
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a `ChainedTransactionManager`.
|
||||
|
||||
IMPORTANT: If you deploy multiple instances of your application, each instance needs a unique `transactionIdPrefix`.
|
||||
|
||||
[[kafka-error-channels]]
|
||||
=== Error Channels
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.BUILD-SNAPSHOT</version>
|
||||
<version>3.1.0.M1</version>
|
||||
</parent>
|
||||
<packaging>pom</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
|
||||
@@ -589,11 +589,11 @@ public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Ob
|
||||
|
||||
Then you must set the application id for this using the following binding property.
|
||||
|
||||
`spring.cloud.stream.kafka.streams.bindings.input.applicationId`
|
||||
`spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId`
|
||||
|
||||
and
|
||||
|
||||
`spring.cloud.stream.kafka.streams.bindings.anotherInput.applicationId`
|
||||
`spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId`
|
||||
|
||||
|
||||
For function based model also, this approach of setting application id at the binding level will work.
|
||||
@@ -1448,6 +1448,19 @@ By default, the `Kafkastreams.cleanup()` method is called when the binding is st
|
||||
See https://docs.spring.io/spring-kafka/reference/html/_reference.html#_configuration[the Spring Kafka documentation].
|
||||
To modify this behavior simply add a single `CleanupConfig` `@Bean` (configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.
|
||||
|
||||
|
||||
=== Kafka Streams topology visualization
|
||||
|
||||
Kafka Streams binder provides the following actuator endpoints for retrieving the topology description using which you can visualize the topology using external tools.
|
||||
|
||||
`/actuator/topology`
|
||||
|
||||
`/actuator/topology/<applicaiton-id of the processor>`
|
||||
|
||||
You need to include the actuator and web dependencies from Spring Boot to access these endpoints.
|
||||
Further, you also need to add `topology` to `management.endpoints.web.exposure.include` property.
|
||||
By default, the `topology` endpoint is disabled.
|
||||
|
||||
=== Configuration Options
|
||||
|
||||
This section contains the configuration options used by the Kafka Streams binder.
|
||||
|
||||
@@ -281,6 +281,12 @@ pollTimeout::
|
||||
Timeout used for polling in pollable consumers.
|
||||
+
|
||||
Default: 5 seconds.
|
||||
transactionManager::
|
||||
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
|
||||
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
|
||||
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
==== Consuming Batches
|
||||
|
||||
@@ -392,6 +398,12 @@ Supported values are `none`, `gzip`, `snappy` and `lz4`.
|
||||
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
|
||||
+
|
||||
Default: `none`.
|
||||
transactionManager::
|
||||
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
|
||||
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
|
||||
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
==== Usage examples
|
||||
|
||||
@@ -571,10 +583,14 @@ If you wish to use transactions in a source application, or from some arbitrary
|
||||
[source, java]
|
||||
----
|
||||
@Bean
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders) {
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders,
|
||||
@Value("${unique.tx.id.per.instance}") String txId) {
|
||||
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
return new KafkaTransactionManager<>(pf);
|
||||
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
|
||||
tm.setTransactionId(txId)
|
||||
return tm;
|
||||
}
|
||||
----
|
||||
====
|
||||
@@ -601,6 +617,8 @@ public static class Sender {
|
||||
|
||||
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a `ChainedTransactionManager`.
|
||||
|
||||
IMPORTANT: If you deploy multiple instances of your application, each instance needs a unique `transactionIdPrefix`.
|
||||
|
||||
[[kafka-error-channels]]
|
||||
=== Error Channels
|
||||
|
||||
|
||||
36
mvnw
vendored
36
mvnw
vendored
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
@@ -19,7 +19,7 @@
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Maven2 Start Up Batch script
|
||||
# Maven Start Up Batch script
|
||||
#
|
||||
# Required ENV vars:
|
||||
# ------------------
|
||||
@@ -114,7 +114,6 @@ if $mingw ; then
|
||||
M2_HOME="`(cd "$M2_HOME"; pwd)`"
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
|
||||
# TODO classpath?
|
||||
fi
|
||||
|
||||
if [ -z "$JAVA_HOME" ]; then
|
||||
@@ -212,7 +211,11 @@ else
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
|
||||
fi
|
||||
jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar"
|
||||
if [ -n "$MVNW_REPOURL" ]; then
|
||||
jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
|
||||
else
|
||||
jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
|
||||
fi
|
||||
while IFS="=" read key value; do
|
||||
case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
|
||||
esac
|
||||
@@ -221,22 +224,38 @@ else
|
||||
echo "Downloading from: $jarUrl"
|
||||
fi
|
||||
wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
|
||||
if $cygwin; then
|
||||
wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
|
||||
fi
|
||||
|
||||
if command -v wget > /dev/null; then
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Found wget ... using wget"
|
||||
fi
|
||||
wget "$jarUrl" -O "$wrapperJarPath"
|
||||
if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
|
||||
wget "$jarUrl" -O "$wrapperJarPath"
|
||||
else
|
||||
wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath"
|
||||
fi
|
||||
elif command -v curl > /dev/null; then
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Found curl ... using curl"
|
||||
fi
|
||||
curl -o "$wrapperJarPath" "$jarUrl"
|
||||
if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
|
||||
curl -o "$wrapperJarPath" "$jarUrl" -f
|
||||
else
|
||||
curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
|
||||
fi
|
||||
|
||||
else
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Falling back to using Java to download"
|
||||
fi
|
||||
javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
|
||||
# For Cygwin, switch paths to Windows format before running javac
|
||||
if $cygwin; then
|
||||
javaClass=`cygpath --path --windows "$javaClass"`
|
||||
fi
|
||||
if [ -e "$javaClass" ]; then
|
||||
if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
@@ -277,6 +296,11 @@ if $cygwin; then
|
||||
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
|
||||
fi
|
||||
|
||||
# Provide a "standardized" way to retrieve the CLI args that will
|
||||
# work with both Windows and non-Windows executions.
|
||||
MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
|
||||
export MAVEN_CMD_LINE_ARGS
|
||||
|
||||
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
exec "$JAVACMD" \
|
||||
|
||||
343
mvnw.cmd
vendored
Executable file → Normal file
343
mvnw.cmd
vendored
Executable file → Normal file
@@ -1,161 +1,182 @@
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Licensed to the Apache Software Foundation (ASF) under one
|
||||
@REM or more contributor license agreements. See the NOTICE file
|
||||
@REM distributed with this work for additional information
|
||||
@REM regarding copyright ownership. The ASF licenses this file
|
||||
@REM to you under the Apache License, Version 2.0 (the
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
@REM KIND, either express or implied. See the License for the
|
||||
@REM specific language governing permissions and limitations
|
||||
@REM under the License.
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Maven2 Start Up Batch script
|
||||
@REM
|
||||
@REM Required ENV vars:
|
||||
@REM JAVA_HOME - location of a JDK home dir
|
||||
@REM
|
||||
@REM Optional ENV vars
|
||||
@REM M2_HOME - location of maven2's installed home dir
|
||||
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
|
||||
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
|
||||
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
|
||||
@REM e.g. to debug Maven itself, use
|
||||
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
|
||||
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
|
||||
@echo off
|
||||
@REM set title of command window
|
||||
title %0
|
||||
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
|
||||
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
|
||||
|
||||
@REM set %HOME% to equivalent of $HOME
|
||||
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
|
||||
|
||||
@REM Execute a user defined script before this one
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
|
||||
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
|
||||
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
|
||||
:skipRcPre
|
||||
|
||||
@setlocal
|
||||
|
||||
set ERROR_CODE=0
|
||||
|
||||
@REM To isolate internal variables from possible post scripts, we use another setlocal
|
||||
@setlocal
|
||||
|
||||
@REM ==== START VALIDATION ====
|
||||
if not "%JAVA_HOME%" == "" goto OkJHome
|
||||
|
||||
echo.
|
||||
echo Error: JAVA_HOME not found in your environment. >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
:OkJHome
|
||||
if exist "%JAVA_HOME%\bin\java.exe" goto init
|
||||
|
||||
echo.
|
||||
echo Error: JAVA_HOME is set to an invalid directory. >&2
|
||||
echo JAVA_HOME = "%JAVA_HOME%" >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
@REM ==== END VALIDATION ====
|
||||
|
||||
:init
|
||||
|
||||
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
|
||||
@REM Fallback to current working directory if not found.
|
||||
|
||||
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
|
||||
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
|
||||
|
||||
set EXEC_DIR=%CD%
|
||||
set WDIR=%EXEC_DIR%
|
||||
:findBaseDir
|
||||
IF EXIST "%WDIR%"\.mvn goto baseDirFound
|
||||
cd ..
|
||||
IF "%WDIR%"=="%CD%" goto baseDirNotFound
|
||||
set WDIR=%CD%
|
||||
goto findBaseDir
|
||||
|
||||
:baseDirFound
|
||||
set MAVEN_PROJECTBASEDIR=%WDIR%
|
||||
cd "%EXEC_DIR%"
|
||||
goto endDetectBaseDir
|
||||
|
||||
:baseDirNotFound
|
||||
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
|
||||
cd "%EXEC_DIR%"
|
||||
|
||||
:endDetectBaseDir
|
||||
|
||||
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
|
||||
|
||||
@setlocal EnableExtensions EnableDelayedExpansion
|
||||
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
|
||||
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
|
||||
|
||||
:endReadAdditionalConfig
|
||||
|
||||
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
|
||||
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
|
||||
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar"
|
||||
FOR /F "tokens=1,2 delims==" %%A IN (%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties) DO (
|
||||
IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
|
||||
)
|
||||
|
||||
@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
|
||||
@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
|
||||
if exist %WRAPPER_JAR% (
|
||||
echo Found %WRAPPER_JAR%
|
||||
) else (
|
||||
echo Couldn't find %WRAPPER_JAR%, downloading it ...
|
||||
echo Downloading from: %DOWNLOAD_URL%
|
||||
powershell -Command "(New-Object Net.WebClient).DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"
|
||||
echo Finished downloading %WRAPPER_JAR%
|
||||
)
|
||||
@REM End of extension
|
||||
|
||||
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
|
||||
if ERRORLEVEL 1 goto error
|
||||
goto end
|
||||
|
||||
:error
|
||||
set ERROR_CODE=1
|
||||
|
||||
:end
|
||||
@endlocal & set ERROR_CODE=%ERROR_CODE%
|
||||
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
|
||||
@REM check for post script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
|
||||
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
|
||||
:skipRcPost
|
||||
|
||||
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
|
||||
if "%MAVEN_BATCH_PAUSE%" == "on" pause
|
||||
|
||||
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
|
||||
|
||||
exit /B %ERROR_CODE%
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Licensed to the Apache Software Foundation (ASF) under one
|
||||
@REM or more contributor license agreements. See the NOTICE file
|
||||
@REM distributed with this work for additional information
|
||||
@REM regarding copyright ownership. The ASF licenses this file
|
||||
@REM to you under the Apache License, Version 2.0 (the
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
@REM KIND, either express or implied. See the License for the
|
||||
@REM specific language governing permissions and limitations
|
||||
@REM under the License.
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Maven Start Up Batch script
|
||||
@REM
|
||||
@REM Required ENV vars:
|
||||
@REM JAVA_HOME - location of a JDK home dir
|
||||
@REM
|
||||
@REM Optional ENV vars
|
||||
@REM M2_HOME - location of maven2's installed home dir
|
||||
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
|
||||
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
|
||||
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
|
||||
@REM e.g. to debug Maven itself, use
|
||||
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
|
||||
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
|
||||
@echo off
|
||||
@REM set title of command window
|
||||
title %0
|
||||
@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
|
||||
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
|
||||
|
||||
@REM set %HOME% to equivalent of $HOME
|
||||
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
|
||||
|
||||
@REM Execute a user defined script before this one
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
|
||||
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
|
||||
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
|
||||
:skipRcPre
|
||||
|
||||
@setlocal
|
||||
|
||||
set ERROR_CODE=0
|
||||
|
||||
@REM To isolate internal variables from possible post scripts, we use another setlocal
|
||||
@setlocal
|
||||
|
||||
@REM ==== START VALIDATION ====
|
||||
if not "%JAVA_HOME%" == "" goto OkJHome
|
||||
|
||||
echo.
|
||||
echo Error: JAVA_HOME not found in your environment. >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
:OkJHome
|
||||
if exist "%JAVA_HOME%\bin\java.exe" goto init
|
||||
|
||||
echo.
|
||||
echo Error: JAVA_HOME is set to an invalid directory. >&2
|
||||
echo JAVA_HOME = "%JAVA_HOME%" >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
@REM ==== END VALIDATION ====
|
||||
|
||||
:init
|
||||
|
||||
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
|
||||
@REM Fallback to current working directory if not found.
|
||||
|
||||
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
|
||||
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
|
||||
|
||||
set EXEC_DIR=%CD%
|
||||
set WDIR=%EXEC_DIR%
|
||||
:findBaseDir
|
||||
IF EXIST "%WDIR%"\.mvn goto baseDirFound
|
||||
cd ..
|
||||
IF "%WDIR%"=="%CD%" goto baseDirNotFound
|
||||
set WDIR=%CD%
|
||||
goto findBaseDir
|
||||
|
||||
:baseDirFound
|
||||
set MAVEN_PROJECTBASEDIR=%WDIR%
|
||||
cd "%EXEC_DIR%"
|
||||
goto endDetectBaseDir
|
||||
|
||||
:baseDirNotFound
|
||||
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
|
||||
cd "%EXEC_DIR%"
|
||||
|
||||
:endDetectBaseDir
|
||||
|
||||
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
|
||||
|
||||
@setlocal EnableExtensions EnableDelayedExpansion
|
||||
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
|
||||
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
|
||||
|
||||
:endReadAdditionalConfig
|
||||
|
||||
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
|
||||
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
|
||||
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
|
||||
|
||||
FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
|
||||
IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
|
||||
)
|
||||
|
||||
@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
|
||||
@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
|
||||
if exist %WRAPPER_JAR% (
|
||||
if "%MVNW_VERBOSE%" == "true" (
|
||||
echo Found %WRAPPER_JAR%
|
||||
)
|
||||
) else (
|
||||
if not "%MVNW_REPOURL%" == "" (
|
||||
SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
|
||||
)
|
||||
if "%MVNW_VERBOSE%" == "true" (
|
||||
echo Couldn't find %WRAPPER_JAR%, downloading it ...
|
||||
echo Downloading from: %DOWNLOAD_URL%
|
||||
)
|
||||
|
||||
powershell -Command "&{"^
|
||||
"$webclient = new-object System.Net.WebClient;"^
|
||||
"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
|
||||
"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
|
||||
"}"^
|
||||
"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
|
||||
"}"
|
||||
if "%MVNW_VERBOSE%" == "true" (
|
||||
echo Finished downloading %WRAPPER_JAR%
|
||||
)
|
||||
)
|
||||
@REM End of extension
|
||||
|
||||
@REM Provide a "standardized" way to retrieve the CLI args that will
|
||||
@REM work with both Windows and non-Windows executions.
|
||||
set MAVEN_CMD_LINE_ARGS=%*
|
||||
|
||||
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
|
||||
if ERRORLEVEL 1 goto error
|
||||
goto end
|
||||
|
||||
:error
|
||||
set ERROR_CODE=1
|
||||
|
||||
:end
|
||||
@endlocal & set ERROR_CODE=%ERROR_CODE%
|
||||
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
|
||||
@REM check for post script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
|
||||
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
|
||||
:skipRcPost
|
||||
|
||||
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
|
||||
if "%MAVEN_BATCH_PAUSE%" == "on" pause
|
||||
|
||||
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
|
||||
|
||||
exit /B %ERROR_CODE%
|
||||
|
||||
12
pom.xml
12
pom.xml
@@ -2,21 +2,21 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.BUILD-SNAPSHOT</version>
|
||||
<version>3.1.0.M1</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.2.2.RELEASE</version>
|
||||
<version>3.0.0.M1</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-kafka.version>2.3.5.RELEASE</spring-kafka.version>
|
||||
<spring-kafka.version>2.4.4.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.2.1.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>2.3.1</kafka.version>
|
||||
<spring-cloud-schema-registry.version>1.0.3.BUILD-SNAPSHOT</spring-cloud-schema-registry.version>
|
||||
<spring-cloud-stream.version>3.0.3.BUILD-SNAPSHOT</spring-cloud-stream.version>
|
||||
<kafka.version>2.4.0</kafka.version>
|
||||
<spring-cloud-schema-registry.version>1.1.0.M1</spring-cloud-schema-registry.version>
|
||||
<spring-cloud-stream.version>3.1.0.M1</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.BUILD-SNAPSHOT</version>
|
||||
<version>3.1.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.BUILD-SNAPSHOT</version>
|
||||
<version>3.1.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -150,6 +150,7 @@ public class KafkaConsumerProperties {
|
||||
/**
|
||||
* @deprecated No longer used by the binder.
|
||||
*/
|
||||
@Deprecated
|
||||
private int recoveryInterval = 5000;
|
||||
|
||||
/**
|
||||
@@ -194,6 +195,11 @@ public class KafkaConsumerProperties {
|
||||
*/
|
||||
private long pollTimeout = org.springframework.kafka.listener.ConsumerProperties.DEFAULT_POLL_TIMEOUT;
|
||||
|
||||
/**
|
||||
* Transaction manager bean name - overrides the binder's transaction configuration.
|
||||
*/
|
||||
private String transactionManager;
|
||||
|
||||
/**
|
||||
* @return if each record needs to be acknowledged.
|
||||
*
|
||||
@@ -462,4 +468,18 @@ public class KafkaConsumerProperties {
|
||||
public void setPollTimeout(long pollTimeout) {
|
||||
this.pollTimeout = pollTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the transaction manager bean name.
|
||||
*
|
||||
* Transaction manager bean name (must be {@code KafkaAwareTransactionManager}.
|
||||
*/
|
||||
public String getTransactionManager() {
|
||||
return this.transactionManager;
|
||||
}
|
||||
|
||||
public void setTransactionManager(String transactionManager) {
|
||||
this.transactionManager = transactionManager;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -94,6 +94,11 @@ public class KafkaProducerProperties {
|
||||
*/
|
||||
private String recordMetadataChannel;
|
||||
|
||||
/**
|
||||
* Transaction manager bean name - overrides the binder's transaction configuration.
|
||||
*/
|
||||
private String transactionManager;
|
||||
|
||||
/**
|
||||
* @return buffer size
|
||||
*
|
||||
@@ -244,6 +249,19 @@ public class KafkaProducerProperties {
|
||||
this.recordMetadataChannel = recordMetadataChannel;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the transaction manager bean name.
|
||||
*
|
||||
* Transaction manager bean name (must be {@code KafkaAwareTransactionManager}.
|
||||
*/
|
||||
public String getTransactionManager() {
|
||||
return this.transactionManager;
|
||||
}
|
||||
|
||||
public void setTransactionManager(String transactionManager) {
|
||||
this.transactionManager = transactionManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enumeration for compression types.
|
||||
*/
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.BUILD-SNAPSHOT</version>
|
||||
<version>3.1.0.M1</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -17,9 +17,12 @@
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@@ -45,6 +48,7 @@ import org.springframework.util.StringUtils;
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @author Renwei Han
|
||||
* @author Serhii Siryi
|
||||
* @since 2.1.0
|
||||
*/
|
||||
public class InteractiveQueryService {
|
||||
@@ -153,4 +157,25 @@ public class InteractiveQueryService {
|
||||
return streamsMetadata != null ? streamsMetadata.hostInfo() : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the list of {@link HostInfo} where the provided store is hosted on.
|
||||
* It also can include current host info.
|
||||
* Kafka Streams will look through all the consumer instances under the same application id
|
||||
* and retrieves all hosts info.
|
||||
*
|
||||
* Note that the end-user applications must provide `application.server` as a configuration property
|
||||
* for all the application instances when calling this method. If this is not available, then an empty list will be returned.
|
||||
*
|
||||
* @param store store name
|
||||
* @return the list of {@link HostInfo} where provided store is hosted on
|
||||
*/
|
||||
public List<HostInfo> getAllHostsInfo(String store) {
|
||||
return kafkaStreamsRegistry.getKafkaStreams()
|
||||
.stream()
|
||||
.flatMap(k -> k.allMetadataForStore(store).stream())
|
||||
.filter(Objects::nonNull)
|
||||
.map(StreamsMetadata::hostInfo)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -16,12 +16,16 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
|
||||
@@ -31,7 +35,7 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
*
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
class KafkaStreamsRegistry {
|
||||
public class KafkaStreamsRegistry {
|
||||
|
||||
private Map<KafkaStreams, StreamsBuilderFactoryBean> streamsBuilderFactoryBeanMap = new HashMap<>();
|
||||
|
||||
@@ -60,4 +64,18 @@ class KafkaStreamsRegistry {
|
||||
return this.streamsBuilderFactoryBeanMap.get(kafkaStreams);
|
||||
}
|
||||
|
||||
public StreamsBuilderFactoryBean streamsBuilderFactoryBean(String applicationId) {
|
||||
final Optional<StreamsBuilderFactoryBean> first = this.streamsBuilderFactoryBeanMap.values()
|
||||
.stream()
|
||||
.filter(streamsBuilderFactoryBean -> streamsBuilderFactoryBean
|
||||
.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
|
||||
.equals(applicationId))
|
||||
.findFirst();
|
||||
return first.orElse(null);
|
||||
}
|
||||
|
||||
public List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans() {
|
||||
return new ArrayList<>(this.streamsBuilderFactoryBeanMap.values());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.endpoint;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
|
||||
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
|
||||
import org.springframework.boot.actuate.endpoint.annotation.Selector;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Actuator endpoint for topology description.
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @since 3.0.4
|
||||
*/
|
||||
@Endpoint(id = "topology")
|
||||
public class TopologyEndpoint {
|
||||
|
||||
/**
|
||||
* Topology not found message.
|
||||
*/
|
||||
public static final String NO_TOPOLOGY_FOUND_MSG = "No topology found for the given application ID";
|
||||
|
||||
private final KafkaStreamsRegistry kafkaStreamsRegistry;
|
||||
|
||||
public TopologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
|
||||
}
|
||||
|
||||
@ReadOperation
|
||||
public String topology() {
|
||||
final List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsRegistry.streamsBuilderFactoryBeans();
|
||||
final StringBuilder topologyDescription = new StringBuilder();
|
||||
streamsBuilderFactoryBeans.stream()
|
||||
.forEach(streamsBuilderFactoryBean ->
|
||||
topologyDescription.append(streamsBuilderFactoryBean.getTopology().describe().toString()));
|
||||
return topologyDescription.toString();
|
||||
}
|
||||
|
||||
@ReadOperation
|
||||
public String topology(@Selector String applicationId) {
|
||||
if (!StringUtils.isEmpty(applicationId)) {
|
||||
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamsBuilderFactoryBean(applicationId);
|
||||
if (streamsBuilderFactoryBean != null) {
|
||||
return streamsBuilderFactoryBean.getTopology().describe().toString();
|
||||
}
|
||||
else {
|
||||
return NO_TOPOLOGY_FOUND_MSG;
|
||||
}
|
||||
}
|
||||
return NO_TOPOLOGY_FOUND_MSG;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.endpoint;
|
||||
|
||||
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
|
||||
import org.springframework.boot.actuate.autoconfigure.endpoint.condition.ConditionalOnAvailableEndpoint;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @since 3.0.4
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(name = {
|
||||
"org.springframework.boot.actuate.endpoint.annotation.Endpoint" })
|
||||
@AutoConfigureAfter({EndpointAutoConfiguration.class, KafkaStreamsBinderSupportAutoConfiguration.class})
|
||||
public class TopologyEndpointAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnAvailableEndpoint
|
||||
public TopologyEndpoint topologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
return new TopologyEndpoint(kafkaStreamsRegistry);
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration,\
|
||||
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration
|
||||
|
||||
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration,\
|
||||
org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpointAutoConfiguration
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
@@ -162,6 +163,7 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
InteractiveQueryService interactiveQueryService = context
|
||||
.getBean(InteractiveQueryService.class);
|
||||
HostInfo currentHostInfo = interactiveQueryService.getCurrentHostInfo();
|
||||
|
||||
assertThat(currentHostInfo.host() + ":" + currentHostInfo.port())
|
||||
.isEqualTo(embeddedKafka.getBrokersAsString());
|
||||
|
||||
@@ -173,6 +175,13 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
HostInfo hostInfoFoo = interactiveQueryService
|
||||
.getHostInfo("prod-id-count-store-foo", 123, new IntegerSerializer());
|
||||
assertThat(hostInfoFoo).isNull();
|
||||
|
||||
final List<HostInfo> hostInfos = interactiveQueryService.getAllHostsInfo("prod-id-count-store");
|
||||
assertThat(hostInfos.size()).isEqualTo(1);
|
||||
final HostInfo hostInfo1 = hostInfos.get(0);
|
||||
assertThat(hostInfo1.host() + ":" + hostInfo1.port())
|
||||
.isEqualTo(embeddedKafka.getBrokersAsString());
|
||||
|
||||
}
|
||||
|
||||
@EnableBinding(KafkaStreamsProcessor.class)
|
||||
@@ -214,7 +223,6 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class Product {
|
||||
|
||||
@@ -44,6 +44,8 @@ import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpoint;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
|
||||
@@ -108,6 +110,13 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
assertThat(meterRegistry.get("stream.metrics.commit.total").gauge().value()).isEqualTo(1.0);
|
||||
assertThat(meterRegistry.get("app.info.start.time.ms").gauge().value()).isNotNaN();
|
||||
Assert.isTrue(LATCH.await(5, TimeUnit.SECONDS), "Failed to call customizers");
|
||||
//Testing topology endpoint
|
||||
final KafkaStreamsRegistry kafkaStreamsRegistry = context.getBean(KafkaStreamsRegistry.class);
|
||||
final TopologyEndpoint topologyEndpoint = new TopologyEndpoint(kafkaStreamsRegistry);
|
||||
final String topology1 = topologyEndpoint.topology();
|
||||
final String topology2 = topologyEndpoint.topology("testKstreamWordCountFunction");
|
||||
assertThat(topology1).isNotEmpty();
|
||||
assertThat(topology1).isEqualTo(topology2);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -131,6 +131,7 @@ public class MultipleFunctionsInSameAppTests {
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.function.definition=process;analyze",
|
||||
"--spring.cloud.stream.bindings.process-in-0.destination=purchases",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.startOffset=latest",
|
||||
"--spring.cloud.stream.bindings.process-in-0.binder=kafka1",
|
||||
"--spring.cloud.stream.bindings.process-out-0.destination=coffee",
|
||||
"--spring.cloud.stream.bindings.process-out-0.binder=kafka1",
|
||||
@@ -148,6 +149,8 @@ public class MultipleFunctionsInSameAppTests {
|
||||
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.applicationId=my-app-2",
|
||||
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.client.id=analyze-client")) {
|
||||
|
||||
Thread.sleep(1000);
|
||||
receiveAndValidate("purchases", "coffee", "electronics");
|
||||
|
||||
StreamsBuilderFactoryBean processStreamsBuilderFactoryBean = context
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.BUILD-SNAPSHOT</version>
|
||||
<version>3.1.0.M1</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@@ -37,6 +38,8 @@ import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
|
||||
import org.springframework.cloud.stream.binder.BinderHeaders;
|
||||
import org.springframework.integration.IntegrationMessageHeaderAccessor;
|
||||
import org.springframework.kafka.support.AbstractKafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.JacksonUtils;
|
||||
import org.springframework.lang.Nullable;
|
||||
@@ -49,7 +52,7 @@ import org.springframework.util.MimeType;
|
||||
* Custom header mapper for Apache Kafka. This is identical to the {@link org.springframework.kafka.support.DefaultKafkaHeaderMapper}
|
||||
* from spring Kafka. This is provided for addressing some interoperability issues between Spring Cloud Stream 3.0.x
|
||||
* and 2.x apps, where mime types passed as regular {@link MimeType} in the header are not de-serialized properly.
|
||||
* Once those concerns are addressed in Spring Kafka, we will deprecate this class and remove it in a future binder release.
|
||||
* It also suppresses certain internal headers that should never be propagated on output.
|
||||
*
|
||||
* Most headers in {@link org.springframework.kafka.support.KafkaHeaders} are not mapped onto outbound messages.
|
||||
* The exceptions are correlation and reply headers for request/reply
|
||||
@@ -65,6 +68,16 @@ import org.springframework.util.MimeType;
|
||||
*/
|
||||
public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
|
||||
|
||||
private static final String NEGATE = "!";
|
||||
|
||||
private static final String NEVER_ID = NEGATE + MessageHeaders.ID;
|
||||
|
||||
private static final String NEVER_TIMESTAMP = NEGATE + MessageHeaders.TIMESTAMP;
|
||||
|
||||
private static final String NEVER_DELIVERY_ATTEMPT = NEGATE + IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT;
|
||||
|
||||
private static final String NEVER_NATIVE_HEADERS_PRESENT = NEGATE + BinderHeaders.NATIVE_HEADERS_PRESENT;
|
||||
|
||||
private static final String JAVA_LANG_STRING = "java.lang.String";
|
||||
|
||||
private static final List<String> DEFAULT_TRUSTED_PACKAGES =
|
||||
@@ -119,8 +132,10 @@ public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
|
||||
*/
|
||||
public BinderHeaderMapper(ObjectMapper objectMapper) {
|
||||
this(objectMapper,
|
||||
"!" + MessageHeaders.ID,
|
||||
"!" + MessageHeaders.TIMESTAMP,
|
||||
NEVER_ID,
|
||||
NEVER_TIMESTAMP,
|
||||
NEVER_DELIVERY_ATTEMPT,
|
||||
NEVER_NATIVE_HEADERS_PRESENT,
|
||||
"*");
|
||||
}
|
||||
|
||||
@@ -384,6 +399,32 @@ public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add patterns for headers that should never be mapped.
|
||||
* @param patterns the patterns.
|
||||
* @return the modified patterns.
|
||||
* @since 3.0.2
|
||||
*/
|
||||
public static String[] addNeverHeaderPatterns(List<String> patterns) {
|
||||
List<String> patternsToUse = new LinkedList<>(patterns);
|
||||
patternsToUse.add(0, NEVER_NATIVE_HEADERS_PRESENT);
|
||||
patternsToUse.add(0, NEVER_DELIVERY_ATTEMPT);
|
||||
patternsToUse.add(0, NEVER_TIMESTAMP);
|
||||
patternsToUse.add(0, NEVER_ID);
|
||||
return patternsToUse.toArray(new String[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove never headers.
|
||||
* @param headers the headers from which to remove the never headers.
|
||||
* @since 3.0.2
|
||||
*/
|
||||
public static void removeNeverHeaders(Headers headers) {
|
||||
headers.remove(MessageHeaders.ID);
|
||||
headers.remove(MessageHeaders.TIMESTAMP);
|
||||
headers.remove(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
|
||||
headers.remove(BinderHeaders.NATIVE_HEADERS_PRESENT);
|
||||
}
|
||||
|
||||
/**
|
||||
* The {@link StdNodeBasedDeserializer} extension for {@link MimeType} deserialization.
|
||||
|
||||
@@ -25,7 +25,6 @@ import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@@ -103,6 +102,7 @@ import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
||||
import org.springframework.kafka.listener.ConsumerProperties;
|
||||
import org.springframework.kafka.listener.ContainerProperties;
|
||||
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.KafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
@@ -110,8 +110,10 @@ import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.kafka.support.TopicPartitionOffset;
|
||||
import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
|
||||
import org.springframework.kafka.support.converter.MessagingMessageConverter;
|
||||
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
|
||||
import org.springframework.kafka.transaction.KafkaTransactionManager;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
@@ -214,6 +216,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
|
||||
|
||||
private Map<ConsumerDestination, ContainerProperties.AckMode> ackModeInfo = new ConcurrentHashMap<>();
|
||||
|
||||
public KafkaMessageChannelBinder(
|
||||
KafkaBinderConfigurationProperties configurationProperties,
|
||||
KafkaTopicProvisioner provisioningProvider) {
|
||||
@@ -343,9 +347,12 @@ public class KafkaMessageChannelBinder extends
|
||||
* (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
|
||||
* instead, for all producers. A binder is transactional when
|
||||
* 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
|
||||
* Individual bindings can override the binder's transaction manager.
|
||||
*/
|
||||
final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
|
||||
? this.transactionManager.getProducerFactory()
|
||||
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
|
||||
producerProperties.getExtension().getTransactionManager());
|
||||
final ProducerFactory<byte[], byte[]> producerFB = transMan != null
|
||||
? transMan.getProducerFactory()
|
||||
: getProducerFactory(null, producerProperties);
|
||||
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
|
||||
producerProperties.getPartitionCount(), false, () -> {
|
||||
@@ -353,7 +360,7 @@ public class KafkaMessageChannelBinder extends
|
||||
List<PartitionInfo> partitionsFor = producer
|
||||
.partitionsFor(destination.getName());
|
||||
producer.close();
|
||||
if (this.transactionManager == null) {
|
||||
if (transMan == null) {
|
||||
((DisposableBean) producerFB).destroy();
|
||||
}
|
||||
return partitionsFor;
|
||||
@@ -384,7 +391,7 @@ public class KafkaMessageChannelBinder extends
|
||||
if (this.producerListener != null) {
|
||||
kafkaTemplate.setProducerListener(this.producerListener);
|
||||
}
|
||||
if (this.transactionManager != null) {
|
||||
if (transMan != null) {
|
||||
kafkaTemplate.setTransactionIdPrefix(configurationProperties.getTransaction().getTransactionIdPrefix());
|
||||
}
|
||||
ProducerConfigurationMessageHandler handler = new ProducerConfigurationMessageHandler(
|
||||
@@ -422,23 +429,32 @@ public class KafkaMessageChannelBinder extends
|
||||
mapper = null;
|
||||
}
|
||||
else if (mapper == null) {
|
||||
String[] headerPatterns = producerProperties.getExtension()
|
||||
.getHeaderPatterns();
|
||||
String[] headerPatterns = producerProperties.getExtension().getHeaderPatterns();
|
||||
if (headerPatterns != null && headerPatterns.length > 0) {
|
||||
List<String> patterns = new LinkedList<>(Arrays.asList(headerPatterns));
|
||||
if (!patterns.contains("!" + MessageHeaders.TIMESTAMP)) {
|
||||
patterns.add(0, "!" + MessageHeaders.TIMESTAMP);
|
||||
}
|
||||
if (!patterns.contains("!" + MessageHeaders.ID)) {
|
||||
patterns.add(0, "!" + MessageHeaders.ID);
|
||||
}
|
||||
mapper = new BinderHeaderMapper(
|
||||
patterns.toArray(new String[patterns.size()]));
|
||||
BinderHeaderMapper.addNeverHeaderPatterns(Arrays.asList(headerPatterns)));
|
||||
}
|
||||
else {
|
||||
mapper = new BinderHeaderMapper();
|
||||
}
|
||||
}
|
||||
else {
|
||||
KafkaHeaderMapper userHeaderMapper = mapper;
|
||||
mapper = new KafkaHeaderMapper() {
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> target) {
|
||||
userHeaderMapper.toHeaders(source, target);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
userHeaderMapper.fromHeaders(headers, target);
|
||||
BinderHeaderMapper.removeNeverHeaders(target);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
handler.setHeaderMapper(mapper);
|
||||
return handler;
|
||||
}
|
||||
@@ -512,7 +528,7 @@ public class KafkaMessageChannelBinder extends
|
||||
@Override
|
||||
protected boolean useNativeEncoding(
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
|
||||
if (this.transactionManager != null) {
|
||||
if (transactionManager(producerProperties.getExtension().getTransactionManager()) != null) {
|
||||
return this.configurationProperties.getTransaction().getProducer()
|
||||
.isUseNativeEncoding();
|
||||
}
|
||||
@@ -578,8 +594,10 @@ public class KafkaMessageChannelBinder extends
|
||||
? new ContainerProperties(Pattern.compile(topics[0]))
|
||||
: new ContainerProperties(topics)
|
||||
: new ContainerProperties(topicPartitionOffsets);
|
||||
if (this.transactionManager != null) {
|
||||
containerProperties.setTransactionManager(this.transactionManager);
|
||||
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
|
||||
extendedConsumerProperties.getExtension().getTransactionManager());
|
||||
if (transMan != null) {
|
||||
containerProperties.setTransactionManager(transMan);
|
||||
}
|
||||
if (this.rebalanceListener != null) {
|
||||
setupRebalanceListener(extendedConsumerProperties, containerProperties);
|
||||
@@ -645,14 +663,14 @@ public class KafkaMessageChannelBinder extends
|
||||
consumerGroup, extendedConsumerProperties);
|
||||
if (!extendedConsumerProperties.isBatchMode()
|
||||
&& extendedConsumerProperties.getMaxAttempts() > 1
|
||||
&& this.transactionManager == null) {
|
||||
&& transMan == null) {
|
||||
|
||||
kafkaMessageDrivenChannelAdapter
|
||||
.setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));
|
||||
kafkaMessageDrivenChannelAdapter
|
||||
.setRecoveryCallback(errorInfrastructure.getRecoverer());
|
||||
}
|
||||
else if (!extendedConsumerProperties.isBatchMode() && this.transactionManager != null) {
|
||||
else if (!extendedConsumerProperties.isBatchMode() && transMan != null) {
|
||||
messageListenerContainer.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(
|
||||
(record, exception) -> {
|
||||
MessagingException payload =
|
||||
@@ -681,6 +699,7 @@ public class KafkaMessageChannelBinder extends
|
||||
kafkaMessageDrivenChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
|
||||
}
|
||||
this.getContainerCustomizer().configure(messageListenerContainer, destination.getName(), group);
|
||||
this.ackModeInfo.put(destination, messageListenerContainer.getContainerProperties().getAckMode());
|
||||
return kafkaMessageDrivenChannelAdapter;
|
||||
}
|
||||
|
||||
@@ -1042,8 +1061,10 @@ public class KafkaMessageChannelBinder extends
|
||||
if (kafkaConsumerProperties.isEnableDlq()) {
|
||||
KafkaProducerProperties dlqProducerProperties = kafkaConsumerProperties
|
||||
.getDlqProducerProperties();
|
||||
ProducerFactory<?, ?> producerFactory = this.transactionManager != null
|
||||
? this.transactionManager.getProducerFactory()
|
||||
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
|
||||
properties.getExtension().getTransactionManager());
|
||||
ProducerFactory<?, ?> producerFactory = transMan != null
|
||||
? transMan.getProducerFactory()
|
||||
: getProducerFactory(null,
|
||||
new ExtendedProducerProperties<>(dlqProducerProperties));
|
||||
final KafkaTemplate<?, ?> kafkaTemplate = new KafkaTemplate<>(
|
||||
@@ -1058,7 +1079,7 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
if (properties.isUseNativeDecoding()) {
|
||||
if (record != null) {
|
||||
Map<String, String> configuration = this.transactionManager == null
|
||||
Map<String, String> configuration = transMan == null
|
||||
? dlqProducerProperties.getConfiguration()
|
||||
: this.configurationProperties.getTransaction()
|
||||
.getProducer().getConfiguration();
|
||||
@@ -1156,22 +1177,46 @@ public class KafkaMessageChannelBinder extends
|
||||
String dlqName = StringUtils.hasText(kafkaConsumerProperties.getDlqName())
|
||||
? kafkaConsumerProperties.getDlqName()
|
||||
: "error." + record.topic() + "." + group;
|
||||
MessageHeaders headers;
|
||||
if (message instanceof ErrorMessage) {
|
||||
final ErrorMessage errorMessage = (ErrorMessage) message;
|
||||
final Message<?> originalMessage = errorMessage.getOriginalMessage();
|
||||
if (originalMessage != null) {
|
||||
headers = originalMessage.getHeaders();
|
||||
}
|
||||
else {
|
||||
headers = message.getHeaders();
|
||||
}
|
||||
}
|
||||
else {
|
||||
headers = message.getHeaders();
|
||||
}
|
||||
if (this.transactionTemplate != null) {
|
||||
Throwable throwable2 = throwable;
|
||||
this.transactionTemplate.executeWithoutResult(status -> {
|
||||
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable2,
|
||||
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
|
||||
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()),
|
||||
headers, this.ackModeInfo.get(destination));
|
||||
});
|
||||
}
|
||||
else {
|
||||
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable,
|
||||
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
|
||||
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()), headers, this.ackModeInfo.get(destination));
|
||||
}
|
||||
};
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
private KafkaAwareTransactionManager<byte[], byte[]> transactionManager(@Nullable String beanName) {
|
||||
if (StringUtils.hasText(beanName)) {
|
||||
return getApplicationContext().getBean(beanName, KafkaAwareTransactionManager.class);
|
||||
}
|
||||
return this.transactionManager;
|
||||
}
|
||||
|
||||
private DlqPartitionFunction determinDlqPartitionFunction(Integer dlqPartitions) {
|
||||
if (this.dlqPartitionFunction != null) {
|
||||
return this.dlqPartitionFunction;
|
||||
@@ -1428,7 +1473,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
void sendToDlq(ConsumerRecord<?, ?> consumerRecord, Headers headers,
|
||||
String dlqName, String group, Throwable throwable, DlqPartitionFunction partitionFunction) {
|
||||
String dlqName, String group, Throwable throwable, DlqPartitionFunction partitionFunction,
|
||||
MessageHeaders messageHeaders, ContainerProperties.AckMode ackMode) {
|
||||
K key = (K) consumerRecord.key();
|
||||
V value = (V) consumerRecord.value();
|
||||
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(dlqName,
|
||||
@@ -1458,6 +1504,9 @@ public class KafkaMessageChannelBinder extends
|
||||
KafkaMessageChannelBinder.this.logger
|
||||
.debug("Sent to DLQ " + sb.toString());
|
||||
}
|
||||
if (ackMode == ContainerProperties.AckMode.MANUAL || ackMode == ContainerProperties.AckMode.MANUAL_IMMEDIATE) {
|
||||
messageHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
@@ -51,7 +52,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.TopicExistsException;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
import org.apache.kafka.common.record.TimestampType;
|
||||
@@ -1105,75 +1108,6 @@ public class KafkaBinderTests extends
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDefaultAutoCommitOnErrorWithoutDlq() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
BindingProperties producerBindingProperties = createProducerBindingProperties(
|
||||
producerProperties);
|
||||
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
producerBindingProperties);
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setMaxAttempts(1);
|
||||
consumerProperties.setBackOffInitialInterval(100);
|
||||
consumerProperties.setBackOffMaxInterval(150);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
|
||||
DirectChannel moduleInputChannel = createBindableChannel("input",
|
||||
createConsumerBindingProperties(consumerProperties));
|
||||
|
||||
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
|
||||
moduleInputChannel.subscribe(handler);
|
||||
|
||||
long uniqueBindingId = System.currentTimeMillis();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(
|
||||
"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(
|
||||
"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
|
||||
consumerProperties);
|
||||
|
||||
String testMessagePayload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage = MessageBuilder
|
||||
.withPayload(testMessagePayload.getBytes()).build();
|
||||
moduleOutputChannel.send(testMessage);
|
||||
|
||||
assertThat(handler.getLatch().await((int) (timeoutMultiplier * 1000),
|
||||
TimeUnit.MILLISECONDS));
|
||||
// first attempt fails
|
||||
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
|
||||
Message<?> receivedMessage = handler.getReceivedMessages().entrySet().iterator()
|
||||
.next().getValue();
|
||||
assertThat(receivedMessage).isNotNull();
|
||||
assertThat(
|
||||
new String((byte[]) receivedMessage.getPayload(), StandardCharsets.UTF_8))
|
||||
.isEqualTo(testMessagePayload);
|
||||
assertThat(handler.getInvocationCount())
|
||||
.isEqualTo(consumerProperties.getMaxAttempts());
|
||||
consumerBinding.unbind();
|
||||
|
||||
// on the second attempt the message is redelivered
|
||||
QueueChannel successfulInputChannel = new QueueChannel();
|
||||
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
|
||||
"testGroup", successfulInputChannel, consumerProperties);
|
||||
binderBindUnbindLatency();
|
||||
String testMessage2Payload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage2 = MessageBuilder
|
||||
.withPayload(testMessage2Payload.getBytes()).build();
|
||||
moduleOutputChannel.send(testMessage2);
|
||||
|
||||
Message<?> firstReceived = receive(successfulInputChannel);
|
||||
assertThat(firstReceived.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
Message<?> secondReceived = receive(successfulInputChannel);
|
||||
assertThat(secondReceived.getPayload()).isEqualTo(testMessage2Payload.getBytes());
|
||||
consumerBinding.unbind();
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDefaultAutoCommitOnErrorWithDlq() throws Exception {
|
||||
@@ -1190,7 +1124,6 @@ public class KafkaBinderTests extends
|
||||
consumerProperties.setBackOffInitialInterval(100);
|
||||
consumerProperties.setBackOffMaxInterval(150);
|
||||
consumerProperties.getExtension().setEnableDlq(true);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
|
||||
DirectChannel moduleInputChannel = createBindableChannel("input",
|
||||
createConsumerBindingProperties(consumerProperties));
|
||||
@@ -1252,6 +1185,87 @@ public class KafkaBinderTests extends
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
//See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/870 for motivation for this test.
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoCommitOnErrorWhenManualAcknowledgement() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
BindingProperties producerBindingProperties = createProducerBindingProperties(
|
||||
producerProperties);
|
||||
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
producerBindingProperties);
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setMaxAttempts(3);
|
||||
consumerProperties.setBackOffInitialInterval(100);
|
||||
consumerProperties.setBackOffMaxInterval(150);
|
||||
//When auto commit is disabled, then the record is committed after publishing to DLQ using the manual acknowledgement.
|
||||
// (if DLQ is enabled, which is, in this case).
|
||||
consumerProperties.getExtension().setAutoCommitOffset(false);
|
||||
consumerProperties.getExtension().setEnableDlq(true);
|
||||
|
||||
DirectChannel moduleInputChannel = createBindableChannel("input",
|
||||
createConsumerBindingProperties(consumerProperties));
|
||||
|
||||
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
|
||||
moduleInputChannel.subscribe(handler);
|
||||
long uniqueBindingId = System.currentTimeMillis();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(
|
||||
"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(
|
||||
"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
|
||||
consumerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
|
||||
dlqConsumerProperties.setMaxAttempts(1);
|
||||
QueueChannel dlqChannel = new QueueChannel();
|
||||
Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
|
||||
"error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel,
|
||||
dlqConsumerProperties);
|
||||
|
||||
String testMessagePayload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage = MessageBuilder
|
||||
.withPayload(testMessagePayload.getBytes()).build();
|
||||
moduleOutputChannel.send(testMessage);
|
||||
|
||||
Message<?> dlqMessage = receive(dlqChannel, 3);
|
||||
assertThat(dlqMessage).isNotNull();
|
||||
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
|
||||
// first attempt fails
|
||||
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
|
||||
Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator()
|
||||
.next().getValue();
|
||||
assertThat(handledMessage).isNotNull();
|
||||
assertThat(
|
||||
new String((byte[]) handledMessage.getPayload(), StandardCharsets.UTF_8))
|
||||
.isEqualTo(testMessagePayload);
|
||||
assertThat(handler.getInvocationCount())
|
||||
.isEqualTo(consumerProperties.getMaxAttempts());
|
||||
binderBindUnbindLatency();
|
||||
dlqConsumerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
|
||||
// on the second attempt the message is not redelivered because the DLQ is set and the record in error is already committed.
|
||||
QueueChannel successfulInputChannel = new QueueChannel();
|
||||
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
|
||||
"testGroup", successfulInputChannel, consumerProperties);
|
||||
String testMessage2Payload = "test1." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage2 = MessageBuilder
|
||||
.withPayload(testMessage2Payload.getBytes()).build();
|
||||
moduleOutputChannel.send(testMessage2);
|
||||
|
||||
Message<?> receivedMessage = receive(successfulInputChannel);
|
||||
assertThat(receivedMessage.getPayload())
|
||||
.isEqualTo(testMessage2Payload.getBytes());
|
||||
|
||||
binderBindUnbindLatency();
|
||||
consumerBinding.unbind();
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testConfigurableDlqName() throws Exception {
|
||||
@@ -1727,7 +1741,7 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
catch (UnsupportedOperationException ignored) {
|
||||
}
|
||||
List<ChannelInterceptor> interceptors = output.getChannelInterceptors();
|
||||
List<ChannelInterceptor> interceptors = output.getInterceptors();
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
interceptors.forEach(interceptor -> {
|
||||
if (interceptor instanceof PartitioningInterceptor) {
|
||||
@@ -3508,6 +3522,98 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInternalHeadersNotPropagated() throws Exception {
|
||||
testInternalHeadersNotPropagatedGuts("propagate.1", null, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInternalHeadersNotPropagatedCustomHeader() throws Exception {
|
||||
testInternalHeadersNotPropagatedGuts("propagate.2", new String[] { "foo", "*" }, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInternalHeadersNotPropagatedCustomMapper() throws Exception {
|
||||
testInternalHeadersNotPropagatedGuts("propagate.3", null, new BinderHeaderMapper("*"));
|
||||
}
|
||||
|
||||
public void testInternalHeadersNotPropagatedGuts(String name, String[] headerPatterns,
|
||||
KafkaHeaderMapper mapper) throws Exception {
|
||||
|
||||
KafkaTestBinder binder;
|
||||
if (mapper == null) {
|
||||
binder = getBinder();
|
||||
}
|
||||
else {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binderConfiguration.setHeaderMapperBeanName("headerMapper");
|
||||
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
|
||||
binderConfiguration, new TestKafkaProperties());
|
||||
try {
|
||||
kafkaTopicProvisioner.afterPropertiesSet();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner);
|
||||
((GenericApplicationContext) binder.getApplicationContext()).registerBean("headerMapper",
|
||||
KafkaHeaderMapper.class, () -> mapper);
|
||||
}
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.getExtension().setHeaderPatterns(headerPatterns);
|
||||
|
||||
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties));
|
||||
output.setBeanName(name + ".out");
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(name + ".1", output, producerProperties);
|
||||
|
||||
QueueChannel input = new QueueChannel();
|
||||
input.setBeanName(name + ".in");
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(name + ".0", name, input, consumerProperties);
|
||||
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
|
||||
KafkaTemplate template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps));
|
||||
template.send(MessageBuilder.withPayload("internalHeaderPropagation")
|
||||
.setHeader(KafkaHeaders.TOPIC, name + ".0")
|
||||
.setHeader("someHeader", "someValue")
|
||||
.build());
|
||||
|
||||
Message<?> consumed = input.receive(10_000);
|
||||
if (headerPatterns != null) {
|
||||
consumed = MessageBuilder.fromMessage(consumed).setHeader(headerPatterns[0], "bar").build();
|
||||
}
|
||||
output.send(consumed);
|
||||
|
||||
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(name, "false",
|
||||
embeddedKafka.getEmbeddedKafka());
|
||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps);
|
||||
Consumer consumer = cf.createConsumer();
|
||||
consumer.assign(Collections.singletonList(new TopicPartition(name + ".1", 0)));
|
||||
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(10));
|
||||
assertThat(records.count()).isEqualTo(1);
|
||||
ConsumerRecord<?, ?> received = records.iterator().next();
|
||||
assertThat(received.value()).isEqualTo("internalHeaderPropagation".getBytes());
|
||||
Header header = received.headers().lastHeader(BinderHeaders.NATIVE_HEADERS_PRESENT);
|
||||
assertThat(header).isNull();
|
||||
header = received.headers().lastHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
|
||||
assertThat(header).isNull();
|
||||
header = received.headers().lastHeader(MessageHeaders.ID);
|
||||
assertThat(header).isNull();
|
||||
header = received.headers().lastHeader(MessageHeaders.TIMESTAMP);
|
||||
assertThat(header).isNull();
|
||||
assertThat(received.headers().lastHeader("someHeader")).isNotNull();
|
||||
if (headerPatterns != null) {
|
||||
assertThat(received.headers().lastHeader(headerPatterns[0])).isNotNull();
|
||||
}
|
||||
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
private final class FailingInvocationCountingMessageHandler
|
||||
implements MessageHandler {
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.integration2;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@@ -33,22 +34,30 @@ import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.Output;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
|
||||
import org.springframework.cloud.stream.messaging.Processor;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.util.backoff.FixedBackOff;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
@@ -63,6 +72,8 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
"spring.cloud.stream.bindings.input.destination=consumer.producer.txIn",
|
||||
"spring.cloud.stream.bindings.input.group=consumer.producer.tx",
|
||||
"spring.cloud.stream.bindings.input.consumer.max-attempts=1",
|
||||
"spring.cloud.stream.kafka.bindings.input2.consumer.transaction-manager=tm",
|
||||
"spring.cloud.stream.kafka.bindings.output2.producer.transaction-manager=tm",
|
||||
"spring.cloud.stream.bindings.output.destination=consumer.producer.txOut",
|
||||
"spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.",
|
||||
"spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=99",
|
||||
@@ -100,7 +111,17 @@ public class ConsumerProducerTransactionTests {
|
||||
assertThat(this.config.outs).containsExactlyInAnyOrder("ONE", "THREE");
|
||||
}
|
||||
|
||||
@EnableBinding(Processor.class)
|
||||
@Test
|
||||
public void externalTM() {
|
||||
assertThat(this.config.input2Container.getContainerProperties().getTransactionManager())
|
||||
.isSameAs(this.config.tm);
|
||||
Object handler = KafkaTestUtils.getPropertyValue(this.config.output2, "dispatcher.handlers", Set.class)
|
||||
.iterator().next();
|
||||
assertThat(KafkaTestUtils.getPropertyValue(handler, "delegate.kafkaTemplate.producerFactory"))
|
||||
.isSameAs(this.config.pf);
|
||||
}
|
||||
|
||||
@EnableBinding(TwoProcessors.class)
|
||||
@EnableAutoConfiguration
|
||||
public static class Config {
|
||||
|
||||
@@ -111,6 +132,15 @@ public class ConsumerProducerTransactionTests {
|
||||
@Autowired
|
||||
private MessageChannel output;
|
||||
|
||||
@Autowired
|
||||
MessageChannel output2;
|
||||
|
||||
AbstractMessageListenerContainer<?, ?> input2Container;
|
||||
|
||||
ProducerFactory pf;
|
||||
|
||||
KafkaAwareTransactionManager<byte[], byte[]> tm;
|
||||
|
||||
@KafkaListener(id = "test.cons.prod", topics = "consumer.producer.txOut")
|
||||
public void listenOut(String in) {
|
||||
this.outs.add(in);
|
||||
@@ -125,6 +155,10 @@ public class ConsumerProducerTransactionTests {
|
||||
}
|
||||
}
|
||||
|
||||
@StreamListener("input2")
|
||||
public void listenIn2(String in) {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
|
||||
return args -> {
|
||||
@@ -136,10 +170,34 @@ public class ConsumerProducerTransactionTests {
|
||||
|
||||
@Bean
|
||||
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
|
||||
return (container, dest, group) -> container
|
||||
.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, 1L)));
|
||||
return (container, dest, group) -> {
|
||||
container.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, 1L)));
|
||||
if ("input2".equals(dest)) {
|
||||
this.input2Container = container;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Bean
|
||||
public KafkaAwareTransactionManager<byte[], byte[]> tm(ProducerFactory pf) {
|
||||
KafkaAwareTransactionManager mock = mock(KafkaAwareTransactionManager.class);
|
||||
this.pf = pf;
|
||||
given(mock.getProducerFactory()).willReturn(pf);
|
||||
this.tm = mock;
|
||||
return mock;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public interface TwoProcessors extends Processor {
|
||||
|
||||
@Input
|
||||
SubscribableChannel input2();
|
||||
|
||||
@Output
|
||||
MessageChannel output2();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user