Compare commits
12 Commits
v3.0.3.REL
...
v3.1.0.M1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e7487b9ada | ||
|
|
064f5b6611 | ||
|
|
ff859c5859 | ||
|
|
445eabc59a | ||
|
|
cc5d1b1aa6 | ||
|
|
db896532e6 | ||
|
|
07a740a5b5 | ||
|
|
65f8cc5660 | ||
|
|
90a47a675f | ||
|
|
9d212024f8 | ||
|
|
d594bab4cf | ||
|
|
e46bd1f844 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -23,3 +23,4 @@ _site/
|
||||
dump.rdb
|
||||
.apt_generated
|
||||
artifacts
|
||||
.sts4-cache
|
||||
|
||||
51
.mvn/wrapper/MavenWrapperDownloader.java
vendored
Executable file → Normal file
51
.mvn/wrapper/MavenWrapperDownloader.java
vendored
Executable file → Normal file
@@ -1,22 +1,18 @@
|
||||
/*
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
https://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
*/
|
||||
|
||||
* Copyright 2007-present the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import java.net.*;
|
||||
import java.io.*;
|
||||
import java.nio.channels.*;
|
||||
@@ -24,11 +20,12 @@ import java.util.Properties;
|
||||
|
||||
public class MavenWrapperDownloader {
|
||||
|
||||
private static final String WRAPPER_VERSION = "0.5.6";
|
||||
/**
|
||||
* Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
|
||||
*/
|
||||
private static final String DEFAULT_DOWNLOAD_URL =
|
||||
"https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar";
|
||||
private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/"
|
||||
+ WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar";
|
||||
|
||||
/**
|
||||
* Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
|
||||
@@ -76,13 +73,13 @@ public class MavenWrapperDownloader {
|
||||
}
|
||||
}
|
||||
}
|
||||
System.out.println("- Downloading from: : " + url);
|
||||
System.out.println("- Downloading from: " + url);
|
||||
|
||||
File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH);
|
||||
if(!outputFile.getParentFile().exists()) {
|
||||
if(!outputFile.getParentFile().mkdirs()) {
|
||||
System.out.println(
|
||||
"- ERROR creating output direcrory '" + outputFile.getParentFile().getAbsolutePath() + "'");
|
||||
"- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'");
|
||||
}
|
||||
}
|
||||
System.out.println("- Downloading to: " + outputFile.getAbsolutePath());
|
||||
@@ -98,6 +95,16 @@ public class MavenWrapperDownloader {
|
||||
}
|
||||
|
||||
private static void downloadFileFromURL(String urlString, File destination) throws Exception {
|
||||
if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) {
|
||||
String username = System.getenv("MVNW_USERNAME");
|
||||
char[] password = System.getenv("MVNW_PASSWORD").toCharArray();
|
||||
Authenticator.setDefault(new Authenticator() {
|
||||
@Override
|
||||
protected PasswordAuthentication getPasswordAuthentication() {
|
||||
return new PasswordAuthentication(username, password);
|
||||
}
|
||||
});
|
||||
}
|
||||
URL website = new URL(urlString);
|
||||
ReadableByteChannel rbc;
|
||||
rbc = Channels.newChannel(website.openStream());
|
||||
|
||||
BIN
.mvn/wrapper/maven-wrapper.jar
vendored
Executable file → Normal file
BIN
.mvn/wrapper/maven-wrapper.jar
vendored
Executable file → Normal file
Binary file not shown.
3
.mvn/wrapper/maven-wrapper.properties
vendored
Executable file → Normal file
3
.mvn/wrapper/maven-wrapper.properties
vendored
Executable file → Normal file
@@ -1 +1,2 @@
|
||||
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.5.4/apache-maven-3.5.4-bin.zip
|
||||
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip
|
||||
wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar
|
||||
|
||||
22
README.adoc
22
README.adoc
@@ -301,6 +301,12 @@ pollTimeout::
|
||||
Timeout used for polling in pollable consumers.
|
||||
+
|
||||
Default: 5 seconds.
|
||||
transactionManager::
|
||||
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
|
||||
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
|
||||
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
==== Consuming Batches
|
||||
|
||||
@@ -412,6 +418,12 @@ Supported values are `none`, `gzip`, `snappy` and `lz4`.
|
||||
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
|
||||
+
|
||||
Default: `none`.
|
||||
transactionManager::
|
||||
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
|
||||
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
|
||||
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
==== Usage examples
|
||||
|
||||
@@ -591,10 +603,14 @@ If you wish to use transactions in a source application, or from some arbitrary
|
||||
[source, java]
|
||||
----
|
||||
@Bean
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders) {
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders,
|
||||
@Value("${unique.tx.id.per.instance}") String txId) {
|
||||
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
return new KafkaTransactionManager<>(pf);
|
||||
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
|
||||
tm.setTransactionId(txId)
|
||||
return tm;
|
||||
}
|
||||
----
|
||||
====
|
||||
@@ -621,6 +637,8 @@ public static class Sender {
|
||||
|
||||
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a `ChainedTransactionManager`.
|
||||
|
||||
IMPORTANT: If you deploy multiple instances of your application, each instance needs a unique `transactionIdPrefix`.
|
||||
|
||||
[[kafka-error-channels]]
|
||||
=== Error Channels
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.RELEASE</version>
|
||||
<version>3.1.0.M1</version>
|
||||
</parent>
|
||||
<packaging>pom</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
|
||||
@@ -589,11 +589,11 @@ public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Ob
|
||||
|
||||
Then you must set the application id for this using the following binding property.
|
||||
|
||||
`spring.cloud.stream.kafka.streams.bindings.input.applicationId`
|
||||
`spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId`
|
||||
|
||||
and
|
||||
|
||||
`spring.cloud.stream.kafka.streams.bindings.anotherInput.applicationId`
|
||||
`spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId`
|
||||
|
||||
|
||||
For function based model also, this approach of setting application id at the binding level will work.
|
||||
@@ -1448,6 +1448,19 @@ By default, the `Kafkastreams.cleanup()` method is called when the binding is st
|
||||
See https://docs.spring.io/spring-kafka/reference/html/_reference.html#_configuration[the Spring Kafka documentation].
|
||||
To modify this behavior simply add a single `CleanupConfig` `@Bean` (configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.
|
||||
|
||||
|
||||
=== Kafka Streams topology visualization
|
||||
|
||||
Kafka Streams binder provides the following actuator endpoints for retrieving the topology description using which you can visualize the topology using external tools.
|
||||
|
||||
`/actuator/topology`
|
||||
|
||||
`/actuator/topology/<applicaiton-id of the processor>`
|
||||
|
||||
You need to include the actuator and web dependencies from Spring Boot to access these endpoints.
|
||||
Further, you also need to add `topology` to `management.endpoints.web.exposure.include` property.
|
||||
By default, the `topology` endpoint is disabled.
|
||||
|
||||
=== Configuration Options
|
||||
|
||||
This section contains the configuration options used by the Kafka Streams binder.
|
||||
|
||||
@@ -281,6 +281,12 @@ pollTimeout::
|
||||
Timeout used for polling in pollable consumers.
|
||||
+
|
||||
Default: 5 seconds.
|
||||
transactionManager::
|
||||
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
|
||||
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
|
||||
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
==== Consuming Batches
|
||||
|
||||
@@ -392,6 +398,12 @@ Supported values are `none`, `gzip`, `snappy` and `lz4`.
|
||||
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
|
||||
+
|
||||
Default: `none`.
|
||||
transactionManager::
|
||||
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
|
||||
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
|
||||
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
==== Usage examples
|
||||
|
||||
@@ -571,10 +583,14 @@ If you wish to use transactions in a source application, or from some arbitrary
|
||||
[source, java]
|
||||
----
|
||||
@Bean
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders) {
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders,
|
||||
@Value("${unique.tx.id.per.instance}") String txId) {
|
||||
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
return new KafkaTransactionManager<>(pf);
|
||||
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
|
||||
tm.setTransactionId(txId)
|
||||
return tm;
|
||||
}
|
||||
----
|
||||
====
|
||||
@@ -601,6 +617,8 @@ public static class Sender {
|
||||
|
||||
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a `ChainedTransactionManager`.
|
||||
|
||||
IMPORTANT: If you deploy multiple instances of your application, each instance needs a unique `transactionIdPrefix`.
|
||||
|
||||
[[kafka-error-channels]]
|
||||
=== Error Channels
|
||||
|
||||
|
||||
36
mvnw
vendored
36
mvnw
vendored
@@ -8,7 +8,7 @@
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
@@ -19,7 +19,7 @@
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Maven2 Start Up Batch script
|
||||
# Maven Start Up Batch script
|
||||
#
|
||||
# Required ENV vars:
|
||||
# ------------------
|
||||
@@ -114,7 +114,6 @@ if $mingw ; then
|
||||
M2_HOME="`(cd "$M2_HOME"; pwd)`"
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
|
||||
# TODO classpath?
|
||||
fi
|
||||
|
||||
if [ -z "$JAVA_HOME" ]; then
|
||||
@@ -212,7 +211,11 @@ else
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
|
||||
fi
|
||||
jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar"
|
||||
if [ -n "$MVNW_REPOURL" ]; then
|
||||
jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
|
||||
else
|
||||
jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
|
||||
fi
|
||||
while IFS="=" read key value; do
|
||||
case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
|
||||
esac
|
||||
@@ -221,22 +224,38 @@ else
|
||||
echo "Downloading from: $jarUrl"
|
||||
fi
|
||||
wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
|
||||
if $cygwin; then
|
||||
wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
|
||||
fi
|
||||
|
||||
if command -v wget > /dev/null; then
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Found wget ... using wget"
|
||||
fi
|
||||
wget "$jarUrl" -O "$wrapperJarPath"
|
||||
if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
|
||||
wget "$jarUrl" -O "$wrapperJarPath"
|
||||
else
|
||||
wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath"
|
||||
fi
|
||||
elif command -v curl > /dev/null; then
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Found curl ... using curl"
|
||||
fi
|
||||
curl -o "$wrapperJarPath" "$jarUrl"
|
||||
if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
|
||||
curl -o "$wrapperJarPath" "$jarUrl" -f
|
||||
else
|
||||
curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
|
||||
fi
|
||||
|
||||
else
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
echo "Falling back to using Java to download"
|
||||
fi
|
||||
javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
|
||||
# For Cygwin, switch paths to Windows format before running javac
|
||||
if $cygwin; then
|
||||
javaClass=`cygpath --path --windows "$javaClass"`
|
||||
fi
|
||||
if [ -e "$javaClass" ]; then
|
||||
if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
|
||||
if [ "$MVNW_VERBOSE" = true ]; then
|
||||
@@ -277,6 +296,11 @@ if $cygwin; then
|
||||
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
|
||||
fi
|
||||
|
||||
# Provide a "standardized" way to retrieve the CLI args that will
|
||||
# work with both Windows and non-Windows executions.
|
||||
MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
|
||||
export MAVEN_CMD_LINE_ARGS
|
||||
|
||||
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
exec "$JAVACMD" \
|
||||
|
||||
343
mvnw.cmd
vendored
Executable file → Normal file
343
mvnw.cmd
vendored
Executable file → Normal file
@@ -1,161 +1,182 @@
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Licensed to the Apache Software Foundation (ASF) under one
|
||||
@REM or more contributor license agreements. See the NOTICE file
|
||||
@REM distributed with this work for additional information
|
||||
@REM regarding copyright ownership. The ASF licenses this file
|
||||
@REM to you under the Apache License, Version 2.0 (the
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM https://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
@REM KIND, either express or implied. See the License for the
|
||||
@REM specific language governing permissions and limitations
|
||||
@REM under the License.
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Maven2 Start Up Batch script
|
||||
@REM
|
||||
@REM Required ENV vars:
|
||||
@REM JAVA_HOME - location of a JDK home dir
|
||||
@REM
|
||||
@REM Optional ENV vars
|
||||
@REM M2_HOME - location of maven2's installed home dir
|
||||
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
|
||||
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
|
||||
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
|
||||
@REM e.g. to debug Maven itself, use
|
||||
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
|
||||
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
|
||||
@echo off
|
||||
@REM set title of command window
|
||||
title %0
|
||||
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
|
||||
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
|
||||
|
||||
@REM set %HOME% to equivalent of $HOME
|
||||
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
|
||||
|
||||
@REM Execute a user defined script before this one
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
|
||||
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
|
||||
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
|
||||
:skipRcPre
|
||||
|
||||
@setlocal
|
||||
|
||||
set ERROR_CODE=0
|
||||
|
||||
@REM To isolate internal variables from possible post scripts, we use another setlocal
|
||||
@setlocal
|
||||
|
||||
@REM ==== START VALIDATION ====
|
||||
if not "%JAVA_HOME%" == "" goto OkJHome
|
||||
|
||||
echo.
|
||||
echo Error: JAVA_HOME not found in your environment. >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
:OkJHome
|
||||
if exist "%JAVA_HOME%\bin\java.exe" goto init
|
||||
|
||||
echo.
|
||||
echo Error: JAVA_HOME is set to an invalid directory. >&2
|
||||
echo JAVA_HOME = "%JAVA_HOME%" >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
@REM ==== END VALIDATION ====
|
||||
|
||||
:init
|
||||
|
||||
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
|
||||
@REM Fallback to current working directory if not found.
|
||||
|
||||
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
|
||||
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
|
||||
|
||||
set EXEC_DIR=%CD%
|
||||
set WDIR=%EXEC_DIR%
|
||||
:findBaseDir
|
||||
IF EXIST "%WDIR%"\.mvn goto baseDirFound
|
||||
cd ..
|
||||
IF "%WDIR%"=="%CD%" goto baseDirNotFound
|
||||
set WDIR=%CD%
|
||||
goto findBaseDir
|
||||
|
||||
:baseDirFound
|
||||
set MAVEN_PROJECTBASEDIR=%WDIR%
|
||||
cd "%EXEC_DIR%"
|
||||
goto endDetectBaseDir
|
||||
|
||||
:baseDirNotFound
|
||||
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
|
||||
cd "%EXEC_DIR%"
|
||||
|
||||
:endDetectBaseDir
|
||||
|
||||
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
|
||||
|
||||
@setlocal EnableExtensions EnableDelayedExpansion
|
||||
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
|
||||
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
|
||||
|
||||
:endReadAdditionalConfig
|
||||
|
||||
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
|
||||
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
|
||||
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar"
|
||||
FOR /F "tokens=1,2 delims==" %%A IN (%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties) DO (
|
||||
IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
|
||||
)
|
||||
|
||||
@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
|
||||
@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
|
||||
if exist %WRAPPER_JAR% (
|
||||
echo Found %WRAPPER_JAR%
|
||||
) else (
|
||||
echo Couldn't find %WRAPPER_JAR%, downloading it ...
|
||||
echo Downloading from: %DOWNLOAD_URL%
|
||||
powershell -Command "(New-Object Net.WebClient).DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"
|
||||
echo Finished downloading %WRAPPER_JAR%
|
||||
)
|
||||
@REM End of extension
|
||||
|
||||
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
|
||||
if ERRORLEVEL 1 goto error
|
||||
goto end
|
||||
|
||||
:error
|
||||
set ERROR_CODE=1
|
||||
|
||||
:end
|
||||
@endlocal & set ERROR_CODE=%ERROR_CODE%
|
||||
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
|
||||
@REM check for post script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
|
||||
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
|
||||
:skipRcPost
|
||||
|
||||
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
|
||||
if "%MAVEN_BATCH_PAUSE%" == "on" pause
|
||||
|
||||
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
|
||||
|
||||
exit /B %ERROR_CODE%
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Licensed to the Apache Software Foundation (ASF) under one
|
||||
@REM or more contributor license agreements. See the NOTICE file
|
||||
@REM distributed with this work for additional information
|
||||
@REM regarding copyright ownership. The ASF licenses this file
|
||||
@REM to you under the Apache License, Version 2.0 (the
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
@REM KIND, either express or implied. See the License for the
|
||||
@REM specific language governing permissions and limitations
|
||||
@REM under the License.
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Maven Start Up Batch script
|
||||
@REM
|
||||
@REM Required ENV vars:
|
||||
@REM JAVA_HOME - location of a JDK home dir
|
||||
@REM
|
||||
@REM Optional ENV vars
|
||||
@REM M2_HOME - location of maven2's installed home dir
|
||||
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
|
||||
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
|
||||
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
|
||||
@REM e.g. to debug Maven itself, use
|
||||
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
|
||||
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
|
||||
@echo off
|
||||
@REM set title of command window
|
||||
title %0
|
||||
@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
|
||||
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
|
||||
|
||||
@REM set %HOME% to equivalent of $HOME
|
||||
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
|
||||
|
||||
@REM Execute a user defined script before this one
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
|
||||
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
|
||||
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
|
||||
:skipRcPre
|
||||
|
||||
@setlocal
|
||||
|
||||
set ERROR_CODE=0
|
||||
|
||||
@REM To isolate internal variables from possible post scripts, we use another setlocal
|
||||
@setlocal
|
||||
|
||||
@REM ==== START VALIDATION ====
|
||||
if not "%JAVA_HOME%" == "" goto OkJHome
|
||||
|
||||
echo.
|
||||
echo Error: JAVA_HOME not found in your environment. >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
:OkJHome
|
||||
if exist "%JAVA_HOME%\bin\java.exe" goto init
|
||||
|
||||
echo.
|
||||
echo Error: JAVA_HOME is set to an invalid directory. >&2
|
||||
echo JAVA_HOME = "%JAVA_HOME%" >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
@REM ==== END VALIDATION ====
|
||||
|
||||
:init
|
||||
|
||||
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
|
||||
@REM Fallback to current working directory if not found.
|
||||
|
||||
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
|
||||
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
|
||||
|
||||
set EXEC_DIR=%CD%
|
||||
set WDIR=%EXEC_DIR%
|
||||
:findBaseDir
|
||||
IF EXIST "%WDIR%"\.mvn goto baseDirFound
|
||||
cd ..
|
||||
IF "%WDIR%"=="%CD%" goto baseDirNotFound
|
||||
set WDIR=%CD%
|
||||
goto findBaseDir
|
||||
|
||||
:baseDirFound
|
||||
set MAVEN_PROJECTBASEDIR=%WDIR%
|
||||
cd "%EXEC_DIR%"
|
||||
goto endDetectBaseDir
|
||||
|
||||
:baseDirNotFound
|
||||
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
|
||||
cd "%EXEC_DIR%"
|
||||
|
||||
:endDetectBaseDir
|
||||
|
||||
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
|
||||
|
||||
@setlocal EnableExtensions EnableDelayedExpansion
|
||||
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
|
||||
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
|
||||
|
||||
:endReadAdditionalConfig
|
||||
|
||||
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
|
||||
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
|
||||
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
|
||||
|
||||
FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
|
||||
IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
|
||||
)
|
||||
|
||||
@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
|
||||
@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
|
||||
if exist %WRAPPER_JAR% (
|
||||
if "%MVNW_VERBOSE%" == "true" (
|
||||
echo Found %WRAPPER_JAR%
|
||||
)
|
||||
) else (
|
||||
if not "%MVNW_REPOURL%" == "" (
|
||||
SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
|
||||
)
|
||||
if "%MVNW_VERBOSE%" == "true" (
|
||||
echo Couldn't find %WRAPPER_JAR%, downloading it ...
|
||||
echo Downloading from: %DOWNLOAD_URL%
|
||||
)
|
||||
|
||||
powershell -Command "&{"^
|
||||
"$webclient = new-object System.Net.WebClient;"^
|
||||
"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
|
||||
"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
|
||||
"}"^
|
||||
"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
|
||||
"}"
|
||||
if "%MVNW_VERBOSE%" == "true" (
|
||||
echo Finished downloading %WRAPPER_JAR%
|
||||
)
|
||||
)
|
||||
@REM End of extension
|
||||
|
||||
@REM Provide a "standardized" way to retrieve the CLI args that will
|
||||
@REM work with both Windows and non-Windows executions.
|
||||
set MAVEN_CMD_LINE_ARGS=%*
|
||||
|
||||
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
|
||||
if ERRORLEVEL 1 goto error
|
||||
goto end
|
||||
|
||||
:error
|
||||
set ERROR_CODE=1
|
||||
|
||||
:end
|
||||
@endlocal & set ERROR_CODE=%ERROR_CODE%
|
||||
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
|
||||
@REM check for post script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
|
||||
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
|
||||
:skipRcPost
|
||||
|
||||
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
|
||||
if "%MAVEN_BATCH_PAUSE%" == "on" pause
|
||||
|
||||
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
|
||||
|
||||
exit /B %ERROR_CODE%
|
||||
|
||||
12
pom.xml
12
pom.xml
@@ -2,21 +2,21 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.RELEASE</version>
|
||||
<version>3.1.0.M1</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.2.3.RELEASE</version>
|
||||
<version>3.0.0.M1</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-kafka.version>2.3.5.RELEASE</spring-kafka.version>
|
||||
<spring-kafka.version>2.4.4.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.2.1.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>2.3.1</kafka.version>
|
||||
<spring-cloud-schema-registry.version>1.0.3.RELEASE</spring-cloud-schema-registry.version>
|
||||
<spring-cloud-stream.version>3.0.3.RELEASE</spring-cloud-stream.version>
|
||||
<kafka.version>2.4.0</kafka.version>
|
||||
<spring-cloud-schema-registry.version>1.1.0.M1</spring-cloud-schema-registry.version>
|
||||
<spring-cloud-stream.version>3.1.0.M1</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.RELEASE</version>
|
||||
<version>3.1.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.RELEASE</version>
|
||||
<version>3.1.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -150,6 +150,7 @@ public class KafkaConsumerProperties {
|
||||
/**
|
||||
* @deprecated No longer used by the binder.
|
||||
*/
|
||||
@Deprecated
|
||||
private int recoveryInterval = 5000;
|
||||
|
||||
/**
|
||||
@@ -194,6 +195,11 @@ public class KafkaConsumerProperties {
|
||||
*/
|
||||
private long pollTimeout = org.springframework.kafka.listener.ConsumerProperties.DEFAULT_POLL_TIMEOUT;
|
||||
|
||||
/**
|
||||
* Transaction manager bean name - overrides the binder's transaction configuration.
|
||||
*/
|
||||
private String transactionManager;
|
||||
|
||||
/**
|
||||
* @return if each record needs to be acknowledged.
|
||||
*
|
||||
@@ -462,4 +468,18 @@ public class KafkaConsumerProperties {
|
||||
public void setPollTimeout(long pollTimeout) {
|
||||
this.pollTimeout = pollTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the transaction manager bean name.
|
||||
*
|
||||
* Transaction manager bean name (must be {@code KafkaAwareTransactionManager}.
|
||||
*/
|
||||
public String getTransactionManager() {
|
||||
return this.transactionManager;
|
||||
}
|
||||
|
||||
public void setTransactionManager(String transactionManager) {
|
||||
this.transactionManager = transactionManager;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -94,6 +94,11 @@ public class KafkaProducerProperties {
|
||||
*/
|
||||
private String recordMetadataChannel;
|
||||
|
||||
/**
|
||||
* Transaction manager bean name - overrides the binder's transaction configuration.
|
||||
*/
|
||||
private String transactionManager;
|
||||
|
||||
/**
|
||||
* @return buffer size
|
||||
*
|
||||
@@ -244,6 +249,19 @@ public class KafkaProducerProperties {
|
||||
this.recordMetadataChannel = recordMetadataChannel;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the transaction manager bean name.
|
||||
*
|
||||
* Transaction manager bean name (must be {@code KafkaAwareTransactionManager}.
|
||||
*/
|
||||
public String getTransactionManager() {
|
||||
return this.transactionManager;
|
||||
}
|
||||
|
||||
public void setTransactionManager(String transactionManager) {
|
||||
this.transactionManager = transactionManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enumeration for compression types.
|
||||
*/
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.RELEASE</version>
|
||||
<version>3.1.0.M1</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -17,9 +17,12 @@
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@@ -45,6 +48,7 @@ import org.springframework.util.StringUtils;
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @author Renwei Han
|
||||
* @author Serhii Siryi
|
||||
* @since 2.1.0
|
||||
*/
|
||||
public class InteractiveQueryService {
|
||||
@@ -153,4 +157,25 @@ public class InteractiveQueryService {
|
||||
return streamsMetadata != null ? streamsMetadata.hostInfo() : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the list of {@link HostInfo} where the provided store is hosted on.
|
||||
* It also can include current host info.
|
||||
* Kafka Streams will look through all the consumer instances under the same application id
|
||||
* and retrieves all hosts info.
|
||||
*
|
||||
* Note that the end-user applications must provide `application.server` as a configuration property
|
||||
* for all the application instances when calling this method. If this is not available, then an empty list will be returned.
|
||||
*
|
||||
* @param store store name
|
||||
* @return the list of {@link HostInfo} where provided store is hosted on
|
||||
*/
|
||||
public List<HostInfo> getAllHostsInfo(String store) {
|
||||
return kafkaStreamsRegistry.getKafkaStreams()
|
||||
.stream()
|
||||
.flatMap(k -> k.allMetadataForStore(store).stream())
|
||||
.filter(Objects::nonNull)
|
||||
.map(StreamsMetadata::hostInfo)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -16,12 +16,16 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
|
||||
@@ -31,7 +35,7 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
*
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
class KafkaStreamsRegistry {
|
||||
public class KafkaStreamsRegistry {
|
||||
|
||||
private Map<KafkaStreams, StreamsBuilderFactoryBean> streamsBuilderFactoryBeanMap = new HashMap<>();
|
||||
|
||||
@@ -60,4 +64,18 @@ class KafkaStreamsRegistry {
|
||||
return this.streamsBuilderFactoryBeanMap.get(kafkaStreams);
|
||||
}
|
||||
|
||||
public StreamsBuilderFactoryBean streamsBuilderFactoryBean(String applicationId) {
|
||||
final Optional<StreamsBuilderFactoryBean> first = this.streamsBuilderFactoryBeanMap.values()
|
||||
.stream()
|
||||
.filter(streamsBuilderFactoryBean -> streamsBuilderFactoryBean
|
||||
.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
|
||||
.equals(applicationId))
|
||||
.findFirst();
|
||||
return first.orElse(null);
|
||||
}
|
||||
|
||||
public List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans() {
|
||||
return new ArrayList<>(this.streamsBuilderFactoryBeanMap.values());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.endpoint;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
|
||||
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
|
||||
import org.springframework.boot.actuate.endpoint.annotation.Selector;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Actuator endpoint for topology description.
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @since 3.0.4
|
||||
*/
|
||||
@Endpoint(id = "topology")
|
||||
public class TopologyEndpoint {
|
||||
|
||||
/**
|
||||
* Topology not found message.
|
||||
*/
|
||||
public static final String NO_TOPOLOGY_FOUND_MSG = "No topology found for the given application ID";
|
||||
|
||||
private final KafkaStreamsRegistry kafkaStreamsRegistry;
|
||||
|
||||
public TopologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
|
||||
}
|
||||
|
||||
@ReadOperation
|
||||
public String topology() {
|
||||
final List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsRegistry.streamsBuilderFactoryBeans();
|
||||
final StringBuilder topologyDescription = new StringBuilder();
|
||||
streamsBuilderFactoryBeans.stream()
|
||||
.forEach(streamsBuilderFactoryBean ->
|
||||
topologyDescription.append(streamsBuilderFactoryBean.getTopology().describe().toString()));
|
||||
return topologyDescription.toString();
|
||||
}
|
||||
|
||||
@ReadOperation
|
||||
public String topology(@Selector String applicationId) {
|
||||
if (!StringUtils.isEmpty(applicationId)) {
|
||||
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamsBuilderFactoryBean(applicationId);
|
||||
if (streamsBuilderFactoryBean != null) {
|
||||
return streamsBuilderFactoryBean.getTopology().describe().toString();
|
||||
}
|
||||
else {
|
||||
return NO_TOPOLOGY_FOUND_MSG;
|
||||
}
|
||||
}
|
||||
return NO_TOPOLOGY_FOUND_MSG;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.endpoint;
|
||||
|
||||
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
|
||||
import org.springframework.boot.actuate.autoconfigure.endpoint.condition.ConditionalOnAvailableEndpoint;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @since 3.0.4
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(name = {
|
||||
"org.springframework.boot.actuate.endpoint.annotation.Endpoint" })
|
||||
@AutoConfigureAfter({EndpointAutoConfiguration.class, KafkaStreamsBinderSupportAutoConfiguration.class})
|
||||
public class TopologyEndpointAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnAvailableEndpoint
|
||||
public TopologyEndpoint topologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
return new TopologyEndpoint(kafkaStreamsRegistry);
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration,\
|
||||
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration
|
||||
|
||||
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration,\
|
||||
org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpointAutoConfiguration
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
@@ -162,6 +163,7 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
InteractiveQueryService interactiveQueryService = context
|
||||
.getBean(InteractiveQueryService.class);
|
||||
HostInfo currentHostInfo = interactiveQueryService.getCurrentHostInfo();
|
||||
|
||||
assertThat(currentHostInfo.host() + ":" + currentHostInfo.port())
|
||||
.isEqualTo(embeddedKafka.getBrokersAsString());
|
||||
|
||||
@@ -173,6 +175,13 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
HostInfo hostInfoFoo = interactiveQueryService
|
||||
.getHostInfo("prod-id-count-store-foo", 123, new IntegerSerializer());
|
||||
assertThat(hostInfoFoo).isNull();
|
||||
|
||||
final List<HostInfo> hostInfos = interactiveQueryService.getAllHostsInfo("prod-id-count-store");
|
||||
assertThat(hostInfos.size()).isEqualTo(1);
|
||||
final HostInfo hostInfo1 = hostInfos.get(0);
|
||||
assertThat(hostInfo1.host() + ":" + hostInfo1.port())
|
||||
.isEqualTo(embeddedKafka.getBrokersAsString());
|
||||
|
||||
}
|
||||
|
||||
@EnableBinding(KafkaStreamsProcessor.class)
|
||||
@@ -214,7 +223,6 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class Product {
|
||||
|
||||
@@ -44,6 +44,8 @@ import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpoint;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
|
||||
@@ -108,6 +110,13 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
assertThat(meterRegistry.get("stream.metrics.commit.total").gauge().value()).isEqualTo(1.0);
|
||||
assertThat(meterRegistry.get("app.info.start.time.ms").gauge().value()).isNotNaN();
|
||||
Assert.isTrue(LATCH.await(5, TimeUnit.SECONDS), "Failed to call customizers");
|
||||
//Testing topology endpoint
|
||||
final KafkaStreamsRegistry kafkaStreamsRegistry = context.getBean(KafkaStreamsRegistry.class);
|
||||
final TopologyEndpoint topologyEndpoint = new TopologyEndpoint(kafkaStreamsRegistry);
|
||||
final String topology1 = topologyEndpoint.topology();
|
||||
final String topology2 = topologyEndpoint.topology("testKstreamWordCountFunction");
|
||||
assertThat(topology1).isNotEmpty();
|
||||
assertThat(topology1).isEqualTo(topology2);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -131,6 +131,7 @@ public class MultipleFunctionsInSameAppTests {
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.function.definition=process;analyze",
|
||||
"--spring.cloud.stream.bindings.process-in-0.destination=purchases",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.startOffset=latest",
|
||||
"--spring.cloud.stream.bindings.process-in-0.binder=kafka1",
|
||||
"--spring.cloud.stream.bindings.process-out-0.destination=coffee",
|
||||
"--spring.cloud.stream.bindings.process-out-0.binder=kafka1",
|
||||
@@ -148,6 +149,8 @@ public class MultipleFunctionsInSameAppTests {
|
||||
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.applicationId=my-app-2",
|
||||
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.client.id=analyze-client")) {
|
||||
|
||||
Thread.sleep(1000);
|
||||
receiveAndValidate("purchases", "coffee", "electronics");
|
||||
|
||||
StreamsBuilderFactoryBean processStreamsBuilderFactoryBean = context
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.RELEASE</version>
|
||||
<version>3.1.0.M1</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@@ -37,6 +38,8 @@ import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
|
||||
import org.springframework.cloud.stream.binder.BinderHeaders;
|
||||
import org.springframework.integration.IntegrationMessageHeaderAccessor;
|
||||
import org.springframework.kafka.support.AbstractKafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.JacksonUtils;
|
||||
import org.springframework.lang.Nullable;
|
||||
@@ -49,7 +52,7 @@ import org.springframework.util.MimeType;
|
||||
* Custom header mapper for Apache Kafka. This is identical to the {@link org.springframework.kafka.support.DefaultKafkaHeaderMapper}
|
||||
* from spring Kafka. This is provided for addressing some interoperability issues between Spring Cloud Stream 3.0.x
|
||||
* and 2.x apps, where mime types passed as regular {@link MimeType} in the header are not de-serialized properly.
|
||||
* Once those concerns are addressed in Spring Kafka, we will deprecate this class and remove it in a future binder release.
|
||||
* It also suppresses certain internal headers that should never be propagated on output.
|
||||
*
|
||||
* Most headers in {@link org.springframework.kafka.support.KafkaHeaders} are not mapped onto outbound messages.
|
||||
* The exceptions are correlation and reply headers for request/reply
|
||||
@@ -65,6 +68,16 @@ import org.springframework.util.MimeType;
|
||||
*/
|
||||
public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
|
||||
|
||||
private static final String NEGATE = "!";
|
||||
|
||||
private static final String NEVER_ID = NEGATE + MessageHeaders.ID;
|
||||
|
||||
private static final String NEVER_TIMESTAMP = NEGATE + MessageHeaders.TIMESTAMP;
|
||||
|
||||
private static final String NEVER_DELIVERY_ATTEMPT = NEGATE + IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT;
|
||||
|
||||
private static final String NEVER_NATIVE_HEADERS_PRESENT = NEGATE + BinderHeaders.NATIVE_HEADERS_PRESENT;
|
||||
|
||||
private static final String JAVA_LANG_STRING = "java.lang.String";
|
||||
|
||||
private static final List<String> DEFAULT_TRUSTED_PACKAGES =
|
||||
@@ -119,8 +132,10 @@ public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
|
||||
*/
|
||||
public BinderHeaderMapper(ObjectMapper objectMapper) {
|
||||
this(objectMapper,
|
||||
"!" + MessageHeaders.ID,
|
||||
"!" + MessageHeaders.TIMESTAMP,
|
||||
NEVER_ID,
|
||||
NEVER_TIMESTAMP,
|
||||
NEVER_DELIVERY_ATTEMPT,
|
||||
NEVER_NATIVE_HEADERS_PRESENT,
|
||||
"*");
|
||||
}
|
||||
|
||||
@@ -384,6 +399,32 @@ public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add patterns for headers that should never be mapped.
|
||||
* @param patterns the patterns.
|
||||
* @return the modified patterns.
|
||||
* @since 3.0.2
|
||||
*/
|
||||
public static String[] addNeverHeaderPatterns(List<String> patterns) {
|
||||
List<String> patternsToUse = new LinkedList<>(patterns);
|
||||
patternsToUse.add(0, NEVER_NATIVE_HEADERS_PRESENT);
|
||||
patternsToUse.add(0, NEVER_DELIVERY_ATTEMPT);
|
||||
patternsToUse.add(0, NEVER_TIMESTAMP);
|
||||
patternsToUse.add(0, NEVER_ID);
|
||||
return patternsToUse.toArray(new String[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove never headers.
|
||||
* @param headers the headers from which to remove the never headers.
|
||||
* @since 3.0.2
|
||||
*/
|
||||
public static void removeNeverHeaders(Headers headers) {
|
||||
headers.remove(MessageHeaders.ID);
|
||||
headers.remove(MessageHeaders.TIMESTAMP);
|
||||
headers.remove(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
|
||||
headers.remove(BinderHeaders.NATIVE_HEADERS_PRESENT);
|
||||
}
|
||||
|
||||
/**
|
||||
* The {@link StdNodeBasedDeserializer} extension for {@link MimeType} deserialization.
|
||||
|
||||
@@ -25,7 +25,6 @@ import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@@ -103,6 +102,7 @@ import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
||||
import org.springframework.kafka.listener.ConsumerProperties;
|
||||
import org.springframework.kafka.listener.ContainerProperties;
|
||||
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.KafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
@@ -110,8 +110,10 @@ import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.kafka.support.TopicPartitionOffset;
|
||||
import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
|
||||
import org.springframework.kafka.support.converter.MessagingMessageConverter;
|
||||
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
|
||||
import org.springframework.kafka.transaction.KafkaTransactionManager;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
@@ -214,6 +216,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
|
||||
|
||||
private Map<ConsumerDestination, ContainerProperties.AckMode> ackModeInfo = new ConcurrentHashMap<>();
|
||||
|
||||
public KafkaMessageChannelBinder(
|
||||
KafkaBinderConfigurationProperties configurationProperties,
|
||||
KafkaTopicProvisioner provisioningProvider) {
|
||||
@@ -343,9 +347,12 @@ public class KafkaMessageChannelBinder extends
|
||||
* (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
|
||||
* instead, for all producers. A binder is transactional when
|
||||
* 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
|
||||
* Individual bindings can override the binder's transaction manager.
|
||||
*/
|
||||
final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
|
||||
? this.transactionManager.getProducerFactory()
|
||||
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
|
||||
producerProperties.getExtension().getTransactionManager());
|
||||
final ProducerFactory<byte[], byte[]> producerFB = transMan != null
|
||||
? transMan.getProducerFactory()
|
||||
: getProducerFactory(null, producerProperties);
|
||||
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
|
||||
producerProperties.getPartitionCount(), false, () -> {
|
||||
@@ -353,7 +360,7 @@ public class KafkaMessageChannelBinder extends
|
||||
List<PartitionInfo> partitionsFor = producer
|
||||
.partitionsFor(destination.getName());
|
||||
producer.close();
|
||||
if (this.transactionManager == null) {
|
||||
if (transMan == null) {
|
||||
((DisposableBean) producerFB).destroy();
|
||||
}
|
||||
return partitionsFor;
|
||||
@@ -384,7 +391,7 @@ public class KafkaMessageChannelBinder extends
|
||||
if (this.producerListener != null) {
|
||||
kafkaTemplate.setProducerListener(this.producerListener);
|
||||
}
|
||||
if (this.transactionManager != null) {
|
||||
if (transMan != null) {
|
||||
kafkaTemplate.setTransactionIdPrefix(configurationProperties.getTransaction().getTransactionIdPrefix());
|
||||
}
|
||||
ProducerConfigurationMessageHandler handler = new ProducerConfigurationMessageHandler(
|
||||
@@ -422,23 +429,32 @@ public class KafkaMessageChannelBinder extends
|
||||
mapper = null;
|
||||
}
|
||||
else if (mapper == null) {
|
||||
String[] headerPatterns = producerProperties.getExtension()
|
||||
.getHeaderPatterns();
|
||||
String[] headerPatterns = producerProperties.getExtension().getHeaderPatterns();
|
||||
if (headerPatterns != null && headerPatterns.length > 0) {
|
||||
List<String> patterns = new LinkedList<>(Arrays.asList(headerPatterns));
|
||||
if (!patterns.contains("!" + MessageHeaders.TIMESTAMP)) {
|
||||
patterns.add(0, "!" + MessageHeaders.TIMESTAMP);
|
||||
}
|
||||
if (!patterns.contains("!" + MessageHeaders.ID)) {
|
||||
patterns.add(0, "!" + MessageHeaders.ID);
|
||||
}
|
||||
mapper = new BinderHeaderMapper(
|
||||
patterns.toArray(new String[patterns.size()]));
|
||||
BinderHeaderMapper.addNeverHeaderPatterns(Arrays.asList(headerPatterns)));
|
||||
}
|
||||
else {
|
||||
mapper = new BinderHeaderMapper();
|
||||
}
|
||||
}
|
||||
else {
|
||||
KafkaHeaderMapper userHeaderMapper = mapper;
|
||||
mapper = new KafkaHeaderMapper() {
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> target) {
|
||||
userHeaderMapper.toHeaders(source, target);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
userHeaderMapper.fromHeaders(headers, target);
|
||||
BinderHeaderMapper.removeNeverHeaders(target);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
handler.setHeaderMapper(mapper);
|
||||
return handler;
|
||||
}
|
||||
@@ -512,7 +528,7 @@ public class KafkaMessageChannelBinder extends
|
||||
@Override
|
||||
protected boolean useNativeEncoding(
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
|
||||
if (this.transactionManager != null) {
|
||||
if (transactionManager(producerProperties.getExtension().getTransactionManager()) != null) {
|
||||
return this.configurationProperties.getTransaction().getProducer()
|
||||
.isUseNativeEncoding();
|
||||
}
|
||||
@@ -578,8 +594,10 @@ public class KafkaMessageChannelBinder extends
|
||||
? new ContainerProperties(Pattern.compile(topics[0]))
|
||||
: new ContainerProperties(topics)
|
||||
: new ContainerProperties(topicPartitionOffsets);
|
||||
if (this.transactionManager != null) {
|
||||
containerProperties.setTransactionManager(this.transactionManager);
|
||||
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
|
||||
extendedConsumerProperties.getExtension().getTransactionManager());
|
||||
if (transMan != null) {
|
||||
containerProperties.setTransactionManager(transMan);
|
||||
}
|
||||
if (this.rebalanceListener != null) {
|
||||
setupRebalanceListener(extendedConsumerProperties, containerProperties);
|
||||
@@ -645,14 +663,14 @@ public class KafkaMessageChannelBinder extends
|
||||
consumerGroup, extendedConsumerProperties);
|
||||
if (!extendedConsumerProperties.isBatchMode()
|
||||
&& extendedConsumerProperties.getMaxAttempts() > 1
|
||||
&& this.transactionManager == null) {
|
||||
&& transMan == null) {
|
||||
|
||||
kafkaMessageDrivenChannelAdapter
|
||||
.setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));
|
||||
kafkaMessageDrivenChannelAdapter
|
||||
.setRecoveryCallback(errorInfrastructure.getRecoverer());
|
||||
}
|
||||
else if (!extendedConsumerProperties.isBatchMode() && this.transactionManager != null) {
|
||||
else if (!extendedConsumerProperties.isBatchMode() && transMan != null) {
|
||||
messageListenerContainer.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(
|
||||
(record, exception) -> {
|
||||
MessagingException payload =
|
||||
@@ -681,6 +699,7 @@ public class KafkaMessageChannelBinder extends
|
||||
kafkaMessageDrivenChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
|
||||
}
|
||||
this.getContainerCustomizer().configure(messageListenerContainer, destination.getName(), group);
|
||||
this.ackModeInfo.put(destination, messageListenerContainer.getContainerProperties().getAckMode());
|
||||
return kafkaMessageDrivenChannelAdapter;
|
||||
}
|
||||
|
||||
@@ -1042,8 +1061,10 @@ public class KafkaMessageChannelBinder extends
|
||||
if (kafkaConsumerProperties.isEnableDlq()) {
|
||||
KafkaProducerProperties dlqProducerProperties = kafkaConsumerProperties
|
||||
.getDlqProducerProperties();
|
||||
ProducerFactory<?, ?> producerFactory = this.transactionManager != null
|
||||
? this.transactionManager.getProducerFactory()
|
||||
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
|
||||
properties.getExtension().getTransactionManager());
|
||||
ProducerFactory<?, ?> producerFactory = transMan != null
|
||||
? transMan.getProducerFactory()
|
||||
: getProducerFactory(null,
|
||||
new ExtendedProducerProperties<>(dlqProducerProperties));
|
||||
final KafkaTemplate<?, ?> kafkaTemplate = new KafkaTemplate<>(
|
||||
@@ -1058,7 +1079,7 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
if (properties.isUseNativeDecoding()) {
|
||||
if (record != null) {
|
||||
Map<String, String> configuration = this.transactionManager == null
|
||||
Map<String, String> configuration = transMan == null
|
||||
? dlqProducerProperties.getConfiguration()
|
||||
: this.configurationProperties.getTransaction()
|
||||
.getProducer().getConfiguration();
|
||||
@@ -1156,22 +1177,46 @@ public class KafkaMessageChannelBinder extends
|
||||
String dlqName = StringUtils.hasText(kafkaConsumerProperties.getDlqName())
|
||||
? kafkaConsumerProperties.getDlqName()
|
||||
: "error." + record.topic() + "." + group;
|
||||
MessageHeaders headers;
|
||||
if (message instanceof ErrorMessage) {
|
||||
final ErrorMessage errorMessage = (ErrorMessage) message;
|
||||
final Message<?> originalMessage = errorMessage.getOriginalMessage();
|
||||
if (originalMessage != null) {
|
||||
headers = originalMessage.getHeaders();
|
||||
}
|
||||
else {
|
||||
headers = message.getHeaders();
|
||||
}
|
||||
}
|
||||
else {
|
||||
headers = message.getHeaders();
|
||||
}
|
||||
if (this.transactionTemplate != null) {
|
||||
Throwable throwable2 = throwable;
|
||||
this.transactionTemplate.executeWithoutResult(status -> {
|
||||
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable2,
|
||||
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
|
||||
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()),
|
||||
headers, this.ackModeInfo.get(destination));
|
||||
});
|
||||
}
|
||||
else {
|
||||
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable,
|
||||
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
|
||||
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()), headers, this.ackModeInfo.get(destination));
|
||||
}
|
||||
};
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
private KafkaAwareTransactionManager<byte[], byte[]> transactionManager(@Nullable String beanName) {
|
||||
if (StringUtils.hasText(beanName)) {
|
||||
return getApplicationContext().getBean(beanName, KafkaAwareTransactionManager.class);
|
||||
}
|
||||
return this.transactionManager;
|
||||
}
|
||||
|
||||
private DlqPartitionFunction determinDlqPartitionFunction(Integer dlqPartitions) {
|
||||
if (this.dlqPartitionFunction != null) {
|
||||
return this.dlqPartitionFunction;
|
||||
@@ -1428,7 +1473,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
void sendToDlq(ConsumerRecord<?, ?> consumerRecord, Headers headers,
|
||||
String dlqName, String group, Throwable throwable, DlqPartitionFunction partitionFunction) {
|
||||
String dlqName, String group, Throwable throwable, DlqPartitionFunction partitionFunction,
|
||||
MessageHeaders messageHeaders, ContainerProperties.AckMode ackMode) {
|
||||
K key = (K) consumerRecord.key();
|
||||
V value = (V) consumerRecord.value();
|
||||
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(dlqName,
|
||||
@@ -1458,6 +1504,9 @@ public class KafkaMessageChannelBinder extends
|
||||
KafkaMessageChannelBinder.this.logger
|
||||
.debug("Sent to DLQ " + sb.toString());
|
||||
}
|
||||
if (ackMode == ContainerProperties.AckMode.MANUAL || ackMode == ContainerProperties.AckMode.MANUAL_IMMEDIATE) {
|
||||
messageHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
@@ -51,7 +52,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.TopicExistsException;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
import org.apache.kafka.common.record.TimestampType;
|
||||
@@ -1105,75 +1108,6 @@ public class KafkaBinderTests extends
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDefaultAutoCommitOnErrorWithoutDlq() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
BindingProperties producerBindingProperties = createProducerBindingProperties(
|
||||
producerProperties);
|
||||
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
producerBindingProperties);
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setMaxAttempts(1);
|
||||
consumerProperties.setBackOffInitialInterval(100);
|
||||
consumerProperties.setBackOffMaxInterval(150);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
|
||||
DirectChannel moduleInputChannel = createBindableChannel("input",
|
||||
createConsumerBindingProperties(consumerProperties));
|
||||
|
||||
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
|
||||
moduleInputChannel.subscribe(handler);
|
||||
|
||||
long uniqueBindingId = System.currentTimeMillis();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(
|
||||
"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(
|
||||
"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
|
||||
consumerProperties);
|
||||
|
||||
String testMessagePayload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage = MessageBuilder
|
||||
.withPayload(testMessagePayload.getBytes()).build();
|
||||
moduleOutputChannel.send(testMessage);
|
||||
|
||||
assertThat(handler.getLatch().await((int) (timeoutMultiplier * 1000),
|
||||
TimeUnit.MILLISECONDS));
|
||||
// first attempt fails
|
||||
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
|
||||
Message<?> receivedMessage = handler.getReceivedMessages().entrySet().iterator()
|
||||
.next().getValue();
|
||||
assertThat(receivedMessage).isNotNull();
|
||||
assertThat(
|
||||
new String((byte[]) receivedMessage.getPayload(), StandardCharsets.UTF_8))
|
||||
.isEqualTo(testMessagePayload);
|
||||
assertThat(handler.getInvocationCount())
|
||||
.isEqualTo(consumerProperties.getMaxAttempts());
|
||||
consumerBinding.unbind();
|
||||
|
||||
// on the second attempt the message is redelivered
|
||||
QueueChannel successfulInputChannel = new QueueChannel();
|
||||
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
|
||||
"testGroup", successfulInputChannel, consumerProperties);
|
||||
binderBindUnbindLatency();
|
||||
String testMessage2Payload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage2 = MessageBuilder
|
||||
.withPayload(testMessage2Payload.getBytes()).build();
|
||||
moduleOutputChannel.send(testMessage2);
|
||||
|
||||
Message<?> firstReceived = receive(successfulInputChannel);
|
||||
assertThat(firstReceived.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
Message<?> secondReceived = receive(successfulInputChannel);
|
||||
assertThat(secondReceived.getPayload()).isEqualTo(testMessage2Payload.getBytes());
|
||||
consumerBinding.unbind();
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDefaultAutoCommitOnErrorWithDlq() throws Exception {
|
||||
@@ -1190,7 +1124,6 @@ public class KafkaBinderTests extends
|
||||
consumerProperties.setBackOffInitialInterval(100);
|
||||
consumerProperties.setBackOffMaxInterval(150);
|
||||
consumerProperties.getExtension().setEnableDlq(true);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
|
||||
DirectChannel moduleInputChannel = createBindableChannel("input",
|
||||
createConsumerBindingProperties(consumerProperties));
|
||||
@@ -1252,6 +1185,87 @@ public class KafkaBinderTests extends
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
//See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/870 for motivation for this test.
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoCommitOnErrorWhenManualAcknowledgement() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
BindingProperties producerBindingProperties = createProducerBindingProperties(
|
||||
producerProperties);
|
||||
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
producerBindingProperties);
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setMaxAttempts(3);
|
||||
consumerProperties.setBackOffInitialInterval(100);
|
||||
consumerProperties.setBackOffMaxInterval(150);
|
||||
//When auto commit is disabled, then the record is committed after publishing to DLQ using the manual acknowledgement.
|
||||
// (if DLQ is enabled, which is, in this case).
|
||||
consumerProperties.getExtension().setAutoCommitOffset(false);
|
||||
consumerProperties.getExtension().setEnableDlq(true);
|
||||
|
||||
DirectChannel moduleInputChannel = createBindableChannel("input",
|
||||
createConsumerBindingProperties(consumerProperties));
|
||||
|
||||
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
|
||||
moduleInputChannel.subscribe(handler);
|
||||
long uniqueBindingId = System.currentTimeMillis();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(
|
||||
"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(
|
||||
"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
|
||||
consumerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
|
||||
dlqConsumerProperties.setMaxAttempts(1);
|
||||
QueueChannel dlqChannel = new QueueChannel();
|
||||
Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
|
||||
"error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel,
|
||||
dlqConsumerProperties);
|
||||
|
||||
String testMessagePayload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage = MessageBuilder
|
||||
.withPayload(testMessagePayload.getBytes()).build();
|
||||
moduleOutputChannel.send(testMessage);
|
||||
|
||||
Message<?> dlqMessage = receive(dlqChannel, 3);
|
||||
assertThat(dlqMessage).isNotNull();
|
||||
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
|
||||
// first attempt fails
|
||||
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
|
||||
Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator()
|
||||
.next().getValue();
|
||||
assertThat(handledMessage).isNotNull();
|
||||
assertThat(
|
||||
new String((byte[]) handledMessage.getPayload(), StandardCharsets.UTF_8))
|
||||
.isEqualTo(testMessagePayload);
|
||||
assertThat(handler.getInvocationCount())
|
||||
.isEqualTo(consumerProperties.getMaxAttempts());
|
||||
binderBindUnbindLatency();
|
||||
dlqConsumerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
|
||||
// on the second attempt the message is not redelivered because the DLQ is set and the record in error is already committed.
|
||||
QueueChannel successfulInputChannel = new QueueChannel();
|
||||
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
|
||||
"testGroup", successfulInputChannel, consumerProperties);
|
||||
String testMessage2Payload = "test1." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage2 = MessageBuilder
|
||||
.withPayload(testMessage2Payload.getBytes()).build();
|
||||
moduleOutputChannel.send(testMessage2);
|
||||
|
||||
Message<?> receivedMessage = receive(successfulInputChannel);
|
||||
assertThat(receivedMessage.getPayload())
|
||||
.isEqualTo(testMessage2Payload.getBytes());
|
||||
|
||||
binderBindUnbindLatency();
|
||||
consumerBinding.unbind();
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testConfigurableDlqName() throws Exception {
|
||||
@@ -1727,7 +1741,7 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
catch (UnsupportedOperationException ignored) {
|
||||
}
|
||||
List<ChannelInterceptor> interceptors = output.getChannelInterceptors();
|
||||
List<ChannelInterceptor> interceptors = output.getInterceptors();
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
interceptors.forEach(interceptor -> {
|
||||
if (interceptor instanceof PartitioningInterceptor) {
|
||||
@@ -3508,6 +3522,98 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInternalHeadersNotPropagated() throws Exception {
|
||||
testInternalHeadersNotPropagatedGuts("propagate.1", null, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInternalHeadersNotPropagatedCustomHeader() throws Exception {
|
||||
testInternalHeadersNotPropagatedGuts("propagate.2", new String[] { "foo", "*" }, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInternalHeadersNotPropagatedCustomMapper() throws Exception {
|
||||
testInternalHeadersNotPropagatedGuts("propagate.3", null, new BinderHeaderMapper("*"));
|
||||
}
|
||||
|
||||
public void testInternalHeadersNotPropagatedGuts(String name, String[] headerPatterns,
|
||||
KafkaHeaderMapper mapper) throws Exception {
|
||||
|
||||
KafkaTestBinder binder;
|
||||
if (mapper == null) {
|
||||
binder = getBinder();
|
||||
}
|
||||
else {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binderConfiguration.setHeaderMapperBeanName("headerMapper");
|
||||
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
|
||||
binderConfiguration, new TestKafkaProperties());
|
||||
try {
|
||||
kafkaTopicProvisioner.afterPropertiesSet();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner);
|
||||
((GenericApplicationContext) binder.getApplicationContext()).registerBean("headerMapper",
|
||||
KafkaHeaderMapper.class, () -> mapper);
|
||||
}
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.getExtension().setHeaderPatterns(headerPatterns);
|
||||
|
||||
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties));
|
||||
output.setBeanName(name + ".out");
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(name + ".1", output, producerProperties);
|
||||
|
||||
QueueChannel input = new QueueChannel();
|
||||
input.setBeanName(name + ".in");
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(name + ".0", name, input, consumerProperties);
|
||||
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
|
||||
KafkaTemplate template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps));
|
||||
template.send(MessageBuilder.withPayload("internalHeaderPropagation")
|
||||
.setHeader(KafkaHeaders.TOPIC, name + ".0")
|
||||
.setHeader("someHeader", "someValue")
|
||||
.build());
|
||||
|
||||
Message<?> consumed = input.receive(10_000);
|
||||
if (headerPatterns != null) {
|
||||
consumed = MessageBuilder.fromMessage(consumed).setHeader(headerPatterns[0], "bar").build();
|
||||
}
|
||||
output.send(consumed);
|
||||
|
||||
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(name, "false",
|
||||
embeddedKafka.getEmbeddedKafka());
|
||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps);
|
||||
Consumer consumer = cf.createConsumer();
|
||||
consumer.assign(Collections.singletonList(new TopicPartition(name + ".1", 0)));
|
||||
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(10));
|
||||
assertThat(records.count()).isEqualTo(1);
|
||||
ConsumerRecord<?, ?> received = records.iterator().next();
|
||||
assertThat(received.value()).isEqualTo("internalHeaderPropagation".getBytes());
|
||||
Header header = received.headers().lastHeader(BinderHeaders.NATIVE_HEADERS_PRESENT);
|
||||
assertThat(header).isNull();
|
||||
header = received.headers().lastHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
|
||||
assertThat(header).isNull();
|
||||
header = received.headers().lastHeader(MessageHeaders.ID);
|
||||
assertThat(header).isNull();
|
||||
header = received.headers().lastHeader(MessageHeaders.TIMESTAMP);
|
||||
assertThat(header).isNull();
|
||||
assertThat(received.headers().lastHeader("someHeader")).isNotNull();
|
||||
if (headerPatterns != null) {
|
||||
assertThat(received.headers().lastHeader(headerPatterns[0])).isNotNull();
|
||||
}
|
||||
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
private final class FailingInvocationCountingMessageHandler
|
||||
implements MessageHandler {
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.integration2;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@@ -33,22 +34,30 @@ import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.Output;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
|
||||
import org.springframework.cloud.stream.messaging.Processor;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.util.backoff.FixedBackOff;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
@@ -63,6 +72,8 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
"spring.cloud.stream.bindings.input.destination=consumer.producer.txIn",
|
||||
"spring.cloud.stream.bindings.input.group=consumer.producer.tx",
|
||||
"spring.cloud.stream.bindings.input.consumer.max-attempts=1",
|
||||
"spring.cloud.stream.kafka.bindings.input2.consumer.transaction-manager=tm",
|
||||
"spring.cloud.stream.kafka.bindings.output2.producer.transaction-manager=tm",
|
||||
"spring.cloud.stream.bindings.output.destination=consumer.producer.txOut",
|
||||
"spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.",
|
||||
"spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=99",
|
||||
@@ -100,7 +111,17 @@ public class ConsumerProducerTransactionTests {
|
||||
assertThat(this.config.outs).containsExactlyInAnyOrder("ONE", "THREE");
|
||||
}
|
||||
|
||||
@EnableBinding(Processor.class)
|
||||
@Test
|
||||
public void externalTM() {
|
||||
assertThat(this.config.input2Container.getContainerProperties().getTransactionManager())
|
||||
.isSameAs(this.config.tm);
|
||||
Object handler = KafkaTestUtils.getPropertyValue(this.config.output2, "dispatcher.handlers", Set.class)
|
||||
.iterator().next();
|
||||
assertThat(KafkaTestUtils.getPropertyValue(handler, "delegate.kafkaTemplate.producerFactory"))
|
||||
.isSameAs(this.config.pf);
|
||||
}
|
||||
|
||||
@EnableBinding(TwoProcessors.class)
|
||||
@EnableAutoConfiguration
|
||||
public static class Config {
|
||||
|
||||
@@ -111,6 +132,15 @@ public class ConsumerProducerTransactionTests {
|
||||
@Autowired
|
||||
private MessageChannel output;
|
||||
|
||||
@Autowired
|
||||
MessageChannel output2;
|
||||
|
||||
AbstractMessageListenerContainer<?, ?> input2Container;
|
||||
|
||||
ProducerFactory pf;
|
||||
|
||||
KafkaAwareTransactionManager<byte[], byte[]> tm;
|
||||
|
||||
@KafkaListener(id = "test.cons.prod", topics = "consumer.producer.txOut")
|
||||
public void listenOut(String in) {
|
||||
this.outs.add(in);
|
||||
@@ -125,6 +155,10 @@ public class ConsumerProducerTransactionTests {
|
||||
}
|
||||
}
|
||||
|
||||
@StreamListener("input2")
|
||||
public void listenIn2(String in) {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
|
||||
return args -> {
|
||||
@@ -136,10 +170,34 @@ public class ConsumerProducerTransactionTests {
|
||||
|
||||
@Bean
|
||||
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
|
||||
return (container, dest, group) -> container
|
||||
.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, 1L)));
|
||||
return (container, dest, group) -> {
|
||||
container.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, 1L)));
|
||||
if ("input2".equals(dest)) {
|
||||
this.input2Container = container;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Bean
|
||||
public KafkaAwareTransactionManager<byte[], byte[]> tm(ProducerFactory pf) {
|
||||
KafkaAwareTransactionManager mock = mock(KafkaAwareTransactionManager.class);
|
||||
this.pf = pf;
|
||||
given(mock.getProducerFactory()).willReturn(pf);
|
||||
this.tm = mock;
|
||||
return mock;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public interface TwoProcessors extends Processor {
|
||||
|
||||
@Input
|
||||
SubscribableChannel input2();
|
||||
|
||||
@Output
|
||||
MessageChannel output2();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user