Compare commits

...

12 Commits

Author SHA1 Message Date
buildmaster
e7487b9ada Update SNAPSHOT to 3.1.0.M1 2020-04-07 17:18:40 +00:00
Oleg Zhurakousky
064f5b6611 Updated spring-kafka version to 2.4.4 2020-04-07 19:05:46 +02:00
Oleg Zhurakousky
ff859c5859 update maven wrapper 2020-04-07 18:45:35 +02:00
Gary Russell
445eabc59a Transactional Producer Doc Polishing
- explain txId requirements for producer-initiated transactions
2020-04-07 11:16:14 -04:00
Soby Chacko
cc5d1b1aa6 GH:851 Kafka Streams topology visualization
Add Boot actuator endpoints for topology visualization.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/851
2020-03-25 13:02:57 +01:00
Soby Chacko
db896532e6 Offset commit when DLQ is enabled and manual ack (#871)
* Offset commit when DLQ is enabled and manual ack

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/870

When an error occurs, if the application uses manual acknowldegment
(i.e. autoCommitOffset is false) and DLQ is enabled, then after
publishing to DLQ, the offset is not committed currently.
Addressing this issue by manually commiting after publishing to DLQ.

* Address PR review comments

* Addressing PR review comments - #2
2020-03-24 16:28:39 -04:00
Gary Russell
07a740a5b5 GH-861: Add transaction manager bean override
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/861

Add docs
2020-03-19 17:03:36 -04:00
Serhii Siryi
65f8cc5660 InteractiveQueryService API changes
* Add InteractiveQueryService.getAllHostsInfo to fetch metadata
   of all instances that handles specific store.
 * Test to verify this new API (in the existing InteractiveQueryService tests).
2020-03-05 13:24:32 -05:00
Soby Chacko
90a47a675f Update Kafka Streams docs
Fix missing property prefix in StreamListener based applicationId settings.
2020-03-05 11:40:10 -05:00
Soby Chacko
9d212024f8 Updates for 3.1.0
spring-cloud-build to 3.0.0 snapshot
spring kafka to 2.4.x snapshot
SIK 3.2.1

Remove a test that has behavior inconsistent with new changes in Spring Kafka 2.4
where all error handlers have isAckAfterHandle() true by default. The test for
auto commit offset on error without dlq was expecting this acknowledgement to not
to occur. If applications need to have the ack turned off on error, they should provide a
container customizer where it sets the ack to false. Since this is not a binder
concern, we are removing the test testDefaultAutoCommitOnErrorWithoutDlq.

Cleaning up in Kafka Streams binder tests.
2020-03-04 18:37:08 -05:00
Gary Russell
d594bab4cf GH-853: Don't propagate out "internal" headers
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/853
2020-02-27 13:37:58 -05:00
Oleg Zhurakousky
e46bd1f844 Update POMs for Ivyland 2020-02-14 11:28:28 +01:00
29 changed files with 907 additions and 326 deletions

1
.gitignore vendored
View File

@@ -23,3 +23,4 @@ _site/
dump.rdb
.apt_generated
artifacts
.sts4-cache

51
.mvn/wrapper/MavenWrapperDownloader.java vendored Executable file → Normal file
View File

@@ -1,22 +1,18 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
* Copyright 2007-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.net.*;
import java.io.*;
import java.nio.channels.*;
@@ -24,11 +20,12 @@ import java.util.Properties;
public class MavenWrapperDownloader {
private static final String WRAPPER_VERSION = "0.5.6";
/**
* Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
*/
private static final String DEFAULT_DOWNLOAD_URL =
"https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar";
private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/"
+ WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar";
/**
* Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
@@ -76,13 +73,13 @@ public class MavenWrapperDownloader {
}
}
}
System.out.println("- Downloading from: : " + url);
System.out.println("- Downloading from: " + url);
File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH);
if(!outputFile.getParentFile().exists()) {
if(!outputFile.getParentFile().mkdirs()) {
System.out.println(
"- ERROR creating output direcrory '" + outputFile.getParentFile().getAbsolutePath() + "'");
"- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'");
}
}
System.out.println("- Downloading to: " + outputFile.getAbsolutePath());
@@ -98,6 +95,16 @@ public class MavenWrapperDownloader {
}
private static void downloadFileFromURL(String urlString, File destination) throws Exception {
if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) {
String username = System.getenv("MVNW_USERNAME");
char[] password = System.getenv("MVNW_PASSWORD").toCharArray();
Authenticator.setDefault(new Authenticator() {
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(username, password);
}
});
}
URL website = new URL(urlString);
ReadableByteChannel rbc;
rbc = Channels.newChannel(website.openStream());

BIN
.mvn/wrapper/maven-wrapper.jar vendored Executable file → Normal file

Binary file not shown.

3
.mvn/wrapper/maven-wrapper.properties vendored Executable file → Normal file
View File

@@ -1 +1,2 @@
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.5.4/apache-maven-3.5.4-bin.zip
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip
wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar

View File

@@ -39,7 +39,7 @@ To use Apache Kafka binder, you need to add `spring-cloud-stream-binder-kafka` a
</dependency>
----
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown inn the following example for Maven:
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown in the following example for Maven:
[source,xml]
----
@@ -60,7 +60,7 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka
The consumer group maps directly to the same Apache Kafka concept.
Partitioning also maps directly to Apache Kafka partitions as well.
The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker of at least that version.
The binder currently uses the Apache Kafka `kafka-clients` version `2.3.1`.
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
For example, with versions earlier than 0.11.x.x, native headers are not supported.
Also, 0.11.x.x does not support the `autoAddPartitions` property.
@@ -155,21 +155,15 @@ Default: See individual producer properties.
spring.cloud.stream.kafka.binder.headerMapperBeanName::
The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to and from Kafka headers.
Use this, for example, if you wish to customize the trusted packages in a `DefaultKafkaHeaderMapper` that uses JSON deserialization for the headers.
Use this, for example, if you wish to customize the trusted packages in a `BinderHeaderMapper` bean that uses JSON deserialization for the headers.
If this custom `BinderHeaderMapper` bean is not made available to the binder using this property, then the binder will look for a header mapper bean with the name `kafkaBinderHeaderMapper` that is of type `BinderHeaderMapper` before falling back to a default `BinderHeaderMapper` created by the binder.
+
Default: none.
spring.cloud.stream.kafka.binder.authorizationExceptionRetryInterval::
Enables retrying in case of authorization exceptions.
Defines interval between each retry.
Accepts `Duration`, e.g. `30s`, `2m`, etc.
+
Default: `null` (retries disabled, fail fast)
[[kafka-consumer-properties]]
==== Kafka Consumer Properties
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.consumer.<property>=<value>`.
The following properties are available for Kafka consumers only and
@@ -234,9 +228,20 @@ The DLQ topic name can be configurable by setting the `dlqName` property.
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
See <<kafka-dlq-processing>> processing for more information.
Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message`, and `x-exception-stacktrace` as `byte[]`.
By default, a failed record is sent to the same partition number in the DLQ topic as the original record.
See <<dlq-partition-selection>> for how to change that behavior.
**Not allowed when `destinationIsPattern` is `true`.**
+
Default: `false`.
dlqPartitions::
When `enableDlq` is true, and this property is not set, a dead letter topic with the same number of partitions as the primary topic(s) is created.
Usually, dead-letter records are sent to the same partition in the dead-letter topic as the original record.
This behavior can be changed; see <<dlq-partition-selection>>.
If this property is set to `1` and there is no `DqlPartitionFunction` bean, all dead-letter records will be written to partition `0`.
If this property is greater than `1`, you **MUST** provide a `DlqPartitionFunction` bean.
Note that the actual partition count is affected by the binder's `minPartitionCount` property.
+
Default: `none`
configuration::
Map with a key/value pair containing generic Kafka consumer properties.
In addition to having Kafka consumer properties, other configuration properties can be passed here.
@@ -296,6 +301,12 @@ pollTimeout::
Timeout used for polling in pollable consumers.
+
Default: 5 seconds.
transactionManager::
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
+
Default: none.
==== Consuming Batches
@@ -311,7 +322,7 @@ Refer to the https://docs.spring.io/spring-kafka/docs/2.3.0.BUILD-SNAPSHOT/refer
[[kafka-producer-properties]]
==== Kafka Producer Properties
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.producer.<property>=<value>`.
The following properties are available for Kafka producers only and
@@ -407,6 +418,12 @@ Supported values are `none`, `gzip`, `snappy` and `lz4`.
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
+
Default: `none`.
transactionManager::
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
+
Default: none.
==== Usage examples
@@ -576,16 +593,24 @@ When used in a processor application, the consumer starts the transaction; any r
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
A common producer factory is used for all producer bindings configured using `spring.cloud.stream.kafka.binder.transaction.producer.*` properties; individual binding Kafka producer properties are ignored.
IMPORTANT: Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too.
When retries are enabled (the common property `maxAttempts` is greater than zero) the retry properties are used to configure a `DefaultAfterRollbackProcessor` to enable retries at the container level.
Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the `DefaultAfterRollbackProcessor` which runs after the main transaction has rolled back.
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. `@Scheduled` method), you must get a reference to the transactional producer factory and define a `KafkaTransactionManager` bean using it.
====
[source, java]
----
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
return new KafkaTransactionManager<>(pf);
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
----
====
@@ -612,6 +637,8 @@ public static class Sender {
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a `ChainedTransactionManager`.
IMPORTANT: If you deploy multiple instances of your application, each instance needs a unique `transactionIdPrefix`.
[[kafka-error-channels]]
=== Error Channels

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.0.3.BUILD-SNAPSHOT</version>
<version>3.1.0.M1</version>
</parent>
<packaging>pom</packaging>
<name>spring-cloud-stream-binder-kafka-docs</name>

View File

@@ -589,11 +589,11 @@ public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Ob
Then you must set the application id for this using the following binding property.
`spring.cloud.stream.kafka.streams.bindings.input.applicationId`
`spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId`
and
`spring.cloud.stream.kafka.streams.bindings.anotherInput.applicationId`
`spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId`
For function based model also, this approach of setting application id at the binding level will work.
@@ -1448,6 +1448,19 @@ By default, the `Kafkastreams.cleanup()` method is called when the binding is st
See https://docs.spring.io/spring-kafka/reference/html/_reference.html#_configuration[the Spring Kafka documentation].
To modify this behavior simply add a single `CleanupConfig` `@Bean` (configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.
=== Kafka Streams topology visualization
Kafka Streams binder provides the following actuator endpoints for retrieving the topology description using which you can visualize the topology using external tools.
`/actuator/topology`
`/actuator/topology/<applicaiton-id of the processor>`
You need to include the actuator and web dependencies from Spring Boot to access these endpoints.
Further, you also need to add `topology` to `management.endpoints.web.exposure.include` property.
By default, the `topology` endpoint is disabled.
=== Configuration Options
This section contains the configuration options used by the Kafka Streams binder.

View File

@@ -281,6 +281,12 @@ pollTimeout::
Timeout used for polling in pollable consumers.
+
Default: 5 seconds.
transactionManager::
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
+
Default: none.
==== Consuming Batches
@@ -392,6 +398,12 @@ Supported values are `none`, `gzip`, `snappy` and `lz4`.
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
+
Default: `none`.
transactionManager::
Bean name of a `KafkaAwareTransactionManager` used to override the binder's transaction manager for this binding.
Usually needed if you want to synchronize another transaction with the Kafka transaction, using the `ChainedKafkaTransactionManaager`.
To achieve exactly once consumption and production of records, the consumer and producer bindings must all be configured with the same transaction manager.
+
Default: none.
==== Usage examples
@@ -571,10 +583,14 @@ If you wish to use transactions in a source application, or from some arbitrary
[source, java]
----
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
return new KafkaTransactionManager<>(pf);
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
----
====
@@ -601,6 +617,8 @@ public static class Sender {
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a `ChainedTransactionManager`.
IMPORTANT: If you deploy multiple instances of your application, each instance needs a unique `transactionIdPrefix`.
[[kafka-error-channels]]
=== Error Channels

36
mvnw vendored
View File

@@ -8,7 +8,7 @@
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
@@ -19,7 +19,7 @@
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Maven2 Start Up Batch script
# Maven Start Up Batch script
#
# Required ENV vars:
# ------------------
@@ -114,7 +114,6 @@ if $mingw ; then
M2_HOME="`(cd "$M2_HOME"; pwd)`"
[ -n "$JAVA_HOME" ] &&
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
# TODO classpath?
fi
if [ -z "$JAVA_HOME" ]; then
@@ -212,7 +211,11 @@ else
if [ "$MVNW_VERBOSE" = true ]; then
echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..."
fi
jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar"
if [ -n "$MVNW_REPOURL" ]; then
jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
else
jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
fi
while IFS="=" read key value; do
case "$key" in (wrapperUrl) jarUrl="$value"; break ;;
esac
@@ -221,22 +224,38 @@ else
echo "Downloading from: $jarUrl"
fi
wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar"
if $cygwin; then
wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"`
fi
if command -v wget > /dev/null; then
if [ "$MVNW_VERBOSE" = true ]; then
echo "Found wget ... using wget"
fi
wget "$jarUrl" -O "$wrapperJarPath"
if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
wget "$jarUrl" -O "$wrapperJarPath"
else
wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath"
fi
elif command -v curl > /dev/null; then
if [ "$MVNW_VERBOSE" = true ]; then
echo "Found curl ... using curl"
fi
curl -o "$wrapperJarPath" "$jarUrl"
if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then
curl -o "$wrapperJarPath" "$jarUrl" -f
else
curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f
fi
else
if [ "$MVNW_VERBOSE" = true ]; then
echo "Falling back to using Java to download"
fi
javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java"
# For Cygwin, switch paths to Windows format before running javac
if $cygwin; then
javaClass=`cygpath --path --windows "$javaClass"`
fi
if [ -e "$javaClass" ]; then
if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then
if [ "$MVNW_VERBOSE" = true ]; then
@@ -277,6 +296,11 @@ if $cygwin; then
MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"`
fi
# Provide a "standardized" way to retrieve the CLI args that will
# work with both Windows and non-Windows executions.
MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
export MAVEN_CMD_LINE_ARGS
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
exec "$JAVACMD" \

343
mvnw.cmd vendored Executable file → Normal file
View File

@@ -1,161 +1,182 @@
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM https://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Maven2 Start Up Batch script
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM M2_HOME - location of maven2's installed home dir
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM set title of command window
title %0
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
:skipRcPre
@setlocal
set ERROR_CODE=0
@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal
@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome
echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init
echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
@REM ==== END VALIDATION ====
:init
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir
:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir
:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"
:endDetectBaseDir
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
:endReadAdditionalConfig
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.4.2/maven-wrapper-0.4.2.jar"
FOR /F "tokens=1,2 delims==" %%A IN (%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties) DO (
IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
)
@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
if exist %WRAPPER_JAR% (
echo Found %WRAPPER_JAR%
) else (
echo Couldn't find %WRAPPER_JAR%, downloading it ...
echo Downloading from: %DOWNLOAD_URL%
powershell -Command "(New-Object Net.WebClient).DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"
echo Finished downloading %WRAPPER_JAR%
)
@REM End of extension
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
if ERRORLEVEL 1 goto error
goto end
:error
set ERROR_CODE=1
:end
@endlocal & set ERROR_CODE=%ERROR_CODE%
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
:skipRcPost
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%" == "on" pause
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
exit /B %ERROR_CODE%
@REM ----------------------------------------------------------------------------
@REM Licensed to the Apache Software Foundation (ASF) under one
@REM or more contributor license agreements. See the NOTICE file
@REM distributed with this work for additional information
@REM regarding copyright ownership. The ASF licenses this file
@REM to you under the Apache License, Version 2.0 (the
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM http://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@REM KIND, either express or implied. See the License for the
@REM specific language governing permissions and limitations
@REM under the License.
@REM ----------------------------------------------------------------------------
@REM ----------------------------------------------------------------------------
@REM Maven Start Up Batch script
@REM
@REM Required ENV vars:
@REM JAVA_HOME - location of a JDK home dir
@REM
@REM Optional ENV vars
@REM M2_HOME - location of maven2's installed home dir
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
@REM e.g. to debug Maven itself, use
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
@REM ----------------------------------------------------------------------------
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
@echo off
@REM set title of command window
title %0
@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on'
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
@REM set %HOME% to equivalent of $HOME
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
@REM Execute a user defined script before this one
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
:skipRcPre
@setlocal
set ERROR_CODE=0
@REM To isolate internal variables from possible post scripts, we use another setlocal
@setlocal
@REM ==== START VALIDATION ====
if not "%JAVA_HOME%" == "" goto OkJHome
echo.
echo Error: JAVA_HOME not found in your environment. >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
:OkJHome
if exist "%JAVA_HOME%\bin\java.exe" goto init
echo.
echo Error: JAVA_HOME is set to an invalid directory. >&2
echo JAVA_HOME = "%JAVA_HOME%" >&2
echo Please set the JAVA_HOME variable in your environment to match the >&2
echo location of your Java installation. >&2
echo.
goto error
@REM ==== END VALIDATION ====
:init
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
@REM Fallback to current working directory if not found.
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
set EXEC_DIR=%CD%
set WDIR=%EXEC_DIR%
:findBaseDir
IF EXIST "%WDIR%"\.mvn goto baseDirFound
cd ..
IF "%WDIR%"=="%CD%" goto baseDirNotFound
set WDIR=%CD%
goto findBaseDir
:baseDirFound
set MAVEN_PROJECTBASEDIR=%WDIR%
cd "%EXEC_DIR%"
goto endDetectBaseDir
:baseDirNotFound
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
cd "%EXEC_DIR%"
:endDetectBaseDir
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
@setlocal EnableExtensions EnableDelayedExpansion
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
:endReadAdditionalConfig
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar"
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO (
IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B
)
@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central
@REM This allows using the maven wrapper in projects that prohibit checking in binary data.
if exist %WRAPPER_JAR% (
if "%MVNW_VERBOSE%" == "true" (
echo Found %WRAPPER_JAR%
)
) else (
if not "%MVNW_REPOURL%" == "" (
SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar"
)
if "%MVNW_VERBOSE%" == "true" (
echo Couldn't find %WRAPPER_JAR%, downloading it ...
echo Downloading from: %DOWNLOAD_URL%
)
powershell -Command "&{"^
"$webclient = new-object System.Net.WebClient;"^
"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^
"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^
"}"^
"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^
"}"
if "%MVNW_VERBOSE%" == "true" (
echo Finished downloading %WRAPPER_JAR%
)
)
@REM End of extension
@REM Provide a "standardized" way to retrieve the CLI args that will
@REM work with both Windows and non-Windows executions.
set MAVEN_CMD_LINE_ARGS=%*
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %*
if ERRORLEVEL 1 goto error
goto end
:error
set ERROR_CODE=1
:end
@endlocal & set ERROR_CODE=%ERROR_CODE%
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
@REM check for post script, once with legacy .bat ending and once with .cmd ending
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
:skipRcPost
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
if "%MAVEN_BATCH_PAUSE%" == "on" pause
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
exit /B %ERROR_CODE%

12
pom.xml
View File

@@ -2,21 +2,21 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.0.3.BUILD-SNAPSHOT</version>
<version>3.1.0.M1</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.2.2.RELEASE</version>
<version>3.0.0.M1</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.3.5.RELEASE</spring-kafka.version>
<spring-kafka.version>2.4.4.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.2.1.RELEASE</spring-integration-kafka.version>
<kafka.version>2.3.1</kafka.version>
<spring-cloud-schema-registry.version>1.0.3.BUILD-SNAPSHOT</spring-cloud-schema-registry.version>
<spring-cloud-stream.version>3.0.3.BUILD-SNAPSHOT</spring-cloud-stream.version>
<kafka.version>2.4.0</kafka.version>
<spring-cloud-schema-registry.version>1.1.0.M1</spring-cloud-schema-registry.version>
<spring-cloud-stream.version>3.1.0.M1</spring-cloud-stream.version>
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.0.3.BUILD-SNAPSHOT</version>
<version>3.1.0.M1</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.0.3.BUILD-SNAPSHOT</version>
<version>3.1.0.M1</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -150,6 +150,7 @@ public class KafkaConsumerProperties {
/**
* @deprecated No longer used by the binder.
*/
@Deprecated
private int recoveryInterval = 5000;
/**
@@ -194,6 +195,11 @@ public class KafkaConsumerProperties {
*/
private long pollTimeout = org.springframework.kafka.listener.ConsumerProperties.DEFAULT_POLL_TIMEOUT;
/**
* Transaction manager bean name - overrides the binder's transaction configuration.
*/
private String transactionManager;
/**
* @return if each record needs to be acknowledged.
*
@@ -462,4 +468,18 @@ public class KafkaConsumerProperties {
public void setPollTimeout(long pollTimeout) {
this.pollTimeout = pollTimeout;
}
/**
* @return the transaction manager bean name.
*
* Transaction manager bean name (must be {@code KafkaAwareTransactionManager}.
*/
public String getTransactionManager() {
return this.transactionManager;
}
public void setTransactionManager(String transactionManager) {
this.transactionManager = transactionManager;
}
}

View File

@@ -94,6 +94,11 @@ public class KafkaProducerProperties {
*/
private String recordMetadataChannel;
/**
* Transaction manager bean name - overrides the binder's transaction configuration.
*/
private String transactionManager;
/**
* @return buffer size
*
@@ -244,6 +249,19 @@ public class KafkaProducerProperties {
this.recordMetadataChannel = recordMetadataChannel;
}
/**
* @return the transaction manager bean name.
*
* Transaction manager bean name (must be {@code KafkaAwareTransactionManager}.
*/
public String getTransactionManager() {
return this.transactionManager;
}
public void setTransactionManager(String transactionManager) {
this.transactionManager = transactionManager;
}
/**
* Enumeration for compression types.
*/

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.0.3.BUILD-SNAPSHOT</version>
<version>3.1.0.M1</version>
</parent>
<properties>

View File

@@ -17,9 +17,12 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +48,7 @@ import org.springframework.util.StringUtils;
*
* @author Soby Chacko
* @author Renwei Han
* @author Serhii Siryi
* @since 2.1.0
*/
public class InteractiveQueryService {
@@ -153,4 +157,25 @@ public class InteractiveQueryService {
return streamsMetadata != null ? streamsMetadata.hostInfo() : null;
}
/**
* Gets the list of {@link HostInfo} where the provided store is hosted on.
* It also can include current host info.
* Kafka Streams will look through all the consumer instances under the same application id
* and retrieves all hosts info.
*
* Note that the end-user applications must provide `application.server` as a configuration property
* for all the application instances when calling this method. If this is not available, then an empty list will be returned.
*
* @param store store name
* @return the list of {@link HostInfo} where provided store is hosted on
*/
public List<HostInfo> getAllHostsInfo(String store) {
return kafkaStreamsRegistry.getKafkaStreams()
.stream()
.flatMap(k -> k.allMetadataForStore(store).stream())
.filter(Objects::nonNull)
.map(StreamsMetadata::hostInfo)
.collect(Collectors.toList());
}
}

View File

@@ -16,12 +16,16 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
@@ -31,7 +35,7 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean;
*
* @author Soby Chacko
*/
class KafkaStreamsRegistry {
public class KafkaStreamsRegistry {
private Map<KafkaStreams, StreamsBuilderFactoryBean> streamsBuilderFactoryBeanMap = new HashMap<>();
@@ -60,4 +64,18 @@ class KafkaStreamsRegistry {
return this.streamsBuilderFactoryBeanMap.get(kafkaStreams);
}
public StreamsBuilderFactoryBean streamsBuilderFactoryBean(String applicationId) {
final Optional<StreamsBuilderFactoryBean> first = this.streamsBuilderFactoryBeanMap.values()
.stream()
.filter(streamsBuilderFactoryBean -> streamsBuilderFactoryBean
.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals(applicationId))
.findFirst();
return first.orElse(null);
}
public List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans() {
return new ArrayList<>(this.streamsBuilderFactoryBeanMap.values());
}
}

View File

@@ -0,0 +1,71 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.endpoint;
import java.util.List;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.boot.actuate.endpoint.annotation.Selector;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.util.StringUtils;
/**
* Actuator endpoint for topology description.
*
* @author Soby Chacko
* @since 3.0.4
*/
@Endpoint(id = "topology")
public class TopologyEndpoint {
/**
* Topology not found message.
*/
public static final String NO_TOPOLOGY_FOUND_MSG = "No topology found for the given application ID";
private final KafkaStreamsRegistry kafkaStreamsRegistry;
public TopologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
}
@ReadOperation
public String topology() {
final List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsRegistry.streamsBuilderFactoryBeans();
final StringBuilder topologyDescription = new StringBuilder();
streamsBuilderFactoryBeans.stream()
.forEach(streamsBuilderFactoryBean ->
topologyDescription.append(streamsBuilderFactoryBean.getTopology().describe().toString()));
return topologyDescription.toString();
}
@ReadOperation
public String topology(@Selector String applicationId) {
if (!StringUtils.isEmpty(applicationId)) {
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamsBuilderFactoryBean(applicationId);
if (streamsBuilderFactoryBean != null) {
return streamsBuilderFactoryBean.getTopology().describe().toString();
}
else {
return NO_TOPOLOGY_FOUND_MSG;
}
}
return NO_TOPOLOGY_FOUND_MSG;
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.endpoint;
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
import org.springframework.boot.actuate.autoconfigure.endpoint.condition.ConditionalOnAvailableEndpoint;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Soby Chacko
* @since 3.0.4
*/
@Configuration
@ConditionalOnClass(name = {
"org.springframework.boot.actuate.endpoint.annotation.Endpoint" })
@AutoConfigureAfter({EndpointAutoConfiguration.class, KafkaStreamsBinderSupportAutoConfiguration.class})
public class TopologyEndpointAutoConfiguration {
@Bean
@ConditionalOnAvailableEndpoint
public TopologyEndpoint topologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
return new TopologyEndpoint(kafkaStreamsRegistry);
}
}

View File

@@ -1,4 +1,4 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpointAutoConfiguration

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
@@ -162,6 +163,7 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
InteractiveQueryService interactiveQueryService = context
.getBean(InteractiveQueryService.class);
HostInfo currentHostInfo = interactiveQueryService.getCurrentHostInfo();
assertThat(currentHostInfo.host() + ":" + currentHostInfo.port())
.isEqualTo(embeddedKafka.getBrokersAsString());
@@ -173,6 +175,13 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
HostInfo hostInfoFoo = interactiveQueryService
.getHostInfo("prod-id-count-store-foo", 123, new IntegerSerializer());
assertThat(hostInfoFoo).isNull();
final List<HostInfo> hostInfos = interactiveQueryService.getAllHostsInfo("prod-id-count-store");
assertThat(hostInfos.size()).isEqualTo(1);
final HostInfo hostInfo1 = hostInfos.get(0);
assertThat(hostInfo1.host() + ":" + hostInfo1.port())
.isEqualTo(embeddedKafka.getBrokersAsString());
}
@EnableBinding(KafkaStreamsProcessor.class)
@@ -214,7 +223,6 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
}
}
}
static class Product {

View File

@@ -44,6 +44,8 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpoint;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
@@ -108,6 +110,13 @@ public class KafkaStreamsBinderWordCountFunctionTests {
assertThat(meterRegistry.get("stream.metrics.commit.total").gauge().value()).isEqualTo(1.0);
assertThat(meterRegistry.get("app.info.start.time.ms").gauge().value()).isNotNaN();
Assert.isTrue(LATCH.await(5, TimeUnit.SECONDS), "Failed to call customizers");
//Testing topology endpoint
final KafkaStreamsRegistry kafkaStreamsRegistry = context.getBean(KafkaStreamsRegistry.class);
final TopologyEndpoint topologyEndpoint = new TopologyEndpoint(kafkaStreamsRegistry);
final String topology1 = topologyEndpoint.topology();
final String topology2 = topologyEndpoint.topology("testKstreamWordCountFunction");
assertThat(topology1).isNotEmpty();
assertThat(topology1).isEqualTo(topology2);
}
}

View File

@@ -131,6 +131,7 @@ public class MultipleFunctionsInSameAppTests {
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.definition=process;analyze",
"--spring.cloud.stream.bindings.process-in-0.destination=purchases",
"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.startOffset=latest",
"--spring.cloud.stream.bindings.process-in-0.binder=kafka1",
"--spring.cloud.stream.bindings.process-out-0.destination=coffee",
"--spring.cloud.stream.bindings.process-out-0.binder=kafka1",
@@ -148,6 +149,8 @@ public class MultipleFunctionsInSameAppTests {
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.applicationId=my-app-2",
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.client.id=analyze-client")) {
Thread.sleep(1000);
receiveAndValidate("purchases", "coffee", "electronics");
StreamsBuilderFactoryBean processStreamsBuilderFactoryBean = context

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.0.3.BUILD-SNAPSHOT</version>
<version>3.1.0.M1</version>
</parent>
<dependencies>

View File

@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -37,6 +38,8 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.kafka.support.AbstractKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonUtils;
import org.springframework.lang.Nullable;
@@ -49,7 +52,7 @@ import org.springframework.util.MimeType;
* Custom header mapper for Apache Kafka. This is identical to the {@link org.springframework.kafka.support.DefaultKafkaHeaderMapper}
* from spring Kafka. This is provided for addressing some interoperability issues between Spring Cloud Stream 3.0.x
* and 2.x apps, where mime types passed as regular {@link MimeType} in the header are not de-serialized properly.
* Once those concerns are addressed in Spring Kafka, we will deprecate this class and remove it in a future binder release.
* It also suppresses certain internal headers that should never be propagated on output.
*
* Most headers in {@link org.springframework.kafka.support.KafkaHeaders} are not mapped onto outbound messages.
* The exceptions are correlation and reply headers for request/reply
@@ -65,6 +68,16 @@ import org.springframework.util.MimeType;
*/
public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
private static final String NEGATE = "!";
private static final String NEVER_ID = NEGATE + MessageHeaders.ID;
private static final String NEVER_TIMESTAMP = NEGATE + MessageHeaders.TIMESTAMP;
private static final String NEVER_DELIVERY_ATTEMPT = NEGATE + IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT;
private static final String NEVER_NATIVE_HEADERS_PRESENT = NEGATE + BinderHeaders.NATIVE_HEADERS_PRESENT;
private static final String JAVA_LANG_STRING = "java.lang.String";
private static final List<String> DEFAULT_TRUSTED_PACKAGES =
@@ -119,8 +132,10 @@ public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
*/
public BinderHeaderMapper(ObjectMapper objectMapper) {
this(objectMapper,
"!" + MessageHeaders.ID,
"!" + MessageHeaders.TIMESTAMP,
NEVER_ID,
NEVER_TIMESTAMP,
NEVER_DELIVERY_ATTEMPT,
NEVER_NATIVE_HEADERS_PRESENT,
"*");
}
@@ -384,6 +399,32 @@ public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
return true;
}
/**
* Add patterns for headers that should never be mapped.
* @param patterns the patterns.
* @return the modified patterns.
* @since 3.0.2
*/
public static String[] addNeverHeaderPatterns(List<String> patterns) {
List<String> patternsToUse = new LinkedList<>(patterns);
patternsToUse.add(0, NEVER_NATIVE_HEADERS_PRESENT);
patternsToUse.add(0, NEVER_DELIVERY_ATTEMPT);
patternsToUse.add(0, NEVER_TIMESTAMP);
patternsToUse.add(0, NEVER_ID);
return patternsToUse.toArray(new String[0]);
}
/**
* Remove never headers.
* @param headers the headers from which to remove the never headers.
* @since 3.0.2
*/
public static void removeNeverHeaders(Headers headers) {
headers.remove(MessageHeaders.ID);
headers.remove(MessageHeaders.TIMESTAMP);
headers.remove(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
headers.remove(BinderHeaders.NATIVE_HEADERS_PRESENT);
}
/**
* The {@link StdNodeBasedDeserializer} extension for {@link MimeType} deserialization.

View File

@@ -25,7 +25,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -103,6 +102,7 @@ import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.ProducerListener;
@@ -110,8 +110,10 @@ import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
@@ -214,6 +216,8 @@ public class KafkaMessageChannelBinder extends
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
private Map<ConsumerDestination, ContainerProperties.AckMode> ackModeInfo = new ConcurrentHashMap<>();
public KafkaMessageChannelBinder(
KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
@@ -343,9 +347,12 @@ public class KafkaMessageChannelBinder extends
* (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
* instead, for all producers. A binder is transactional when
* 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
* Individual bindings can override the binder's transaction manager.
*/
final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
? this.transactionManager.getProducerFactory()
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
producerProperties.getExtension().getTransactionManager());
final ProducerFactory<byte[], byte[]> producerFB = transMan != null
? transMan.getProducerFactory()
: getProducerFactory(null, producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
producerProperties.getPartitionCount(), false, () -> {
@@ -353,7 +360,7 @@ public class KafkaMessageChannelBinder extends
List<PartitionInfo> partitionsFor = producer
.partitionsFor(destination.getName());
producer.close();
if (this.transactionManager == null) {
if (transMan == null) {
((DisposableBean) producerFB).destroy();
}
return partitionsFor;
@@ -384,7 +391,7 @@ public class KafkaMessageChannelBinder extends
if (this.producerListener != null) {
kafkaTemplate.setProducerListener(this.producerListener);
}
if (this.transactionManager != null) {
if (transMan != null) {
kafkaTemplate.setTransactionIdPrefix(configurationProperties.getTransaction().getTransactionIdPrefix());
}
ProducerConfigurationMessageHandler handler = new ProducerConfigurationMessageHandler(
@@ -422,23 +429,32 @@ public class KafkaMessageChannelBinder extends
mapper = null;
}
else if (mapper == null) {
String[] headerPatterns = producerProperties.getExtension()
.getHeaderPatterns();
String[] headerPatterns = producerProperties.getExtension().getHeaderPatterns();
if (headerPatterns != null && headerPatterns.length > 0) {
List<String> patterns = new LinkedList<>(Arrays.asList(headerPatterns));
if (!patterns.contains("!" + MessageHeaders.TIMESTAMP)) {
patterns.add(0, "!" + MessageHeaders.TIMESTAMP);
}
if (!patterns.contains("!" + MessageHeaders.ID)) {
patterns.add(0, "!" + MessageHeaders.ID);
}
mapper = new BinderHeaderMapper(
patterns.toArray(new String[patterns.size()]));
BinderHeaderMapper.addNeverHeaderPatterns(Arrays.asList(headerPatterns)));
}
else {
mapper = new BinderHeaderMapper();
}
}
else {
KafkaHeaderMapper userHeaderMapper = mapper;
mapper = new KafkaHeaderMapper() {
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
userHeaderMapper.toHeaders(source, target);
}
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
userHeaderMapper.fromHeaders(headers, target);
BinderHeaderMapper.removeNeverHeaders(target);
}
};
}
handler.setHeaderMapper(mapper);
return handler;
}
@@ -512,7 +528,7 @@ public class KafkaMessageChannelBinder extends
@Override
protected boolean useNativeEncoding(
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
if (this.transactionManager != null) {
if (transactionManager(producerProperties.getExtension().getTransactionManager()) != null) {
return this.configurationProperties.getTransaction().getProducer()
.isUseNativeEncoding();
}
@@ -578,8 +594,10 @@ public class KafkaMessageChannelBinder extends
? new ContainerProperties(Pattern.compile(topics[0]))
: new ContainerProperties(topics)
: new ContainerProperties(topicPartitionOffsets);
if (this.transactionManager != null) {
containerProperties.setTransactionManager(this.transactionManager);
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
extendedConsumerProperties.getExtension().getTransactionManager());
if (transMan != null) {
containerProperties.setTransactionManager(transMan);
}
if (this.rebalanceListener != null) {
setupRebalanceListener(extendedConsumerProperties, containerProperties);
@@ -645,14 +663,14 @@ public class KafkaMessageChannelBinder extends
consumerGroup, extendedConsumerProperties);
if (!extendedConsumerProperties.isBatchMode()
&& extendedConsumerProperties.getMaxAttempts() > 1
&& this.transactionManager == null) {
&& transMan == null) {
kafkaMessageDrivenChannelAdapter
.setRetryTemplate(buildRetryTemplate(extendedConsumerProperties));
kafkaMessageDrivenChannelAdapter
.setRecoveryCallback(errorInfrastructure.getRecoverer());
}
else if (!extendedConsumerProperties.isBatchMode() && this.transactionManager != null) {
else if (!extendedConsumerProperties.isBatchMode() && transMan != null) {
messageListenerContainer.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(
(record, exception) -> {
MessagingException payload =
@@ -681,6 +699,7 @@ public class KafkaMessageChannelBinder extends
kafkaMessageDrivenChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
}
this.getContainerCustomizer().configure(messageListenerContainer, destination.getName(), group);
this.ackModeInfo.put(destination, messageListenerContainer.getContainerProperties().getAckMode());
return kafkaMessageDrivenChannelAdapter;
}
@@ -1042,8 +1061,10 @@ public class KafkaMessageChannelBinder extends
if (kafkaConsumerProperties.isEnableDlq()) {
KafkaProducerProperties dlqProducerProperties = kafkaConsumerProperties
.getDlqProducerProperties();
ProducerFactory<?, ?> producerFactory = this.transactionManager != null
? this.transactionManager.getProducerFactory()
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
properties.getExtension().getTransactionManager());
ProducerFactory<?, ?> producerFactory = transMan != null
? transMan.getProducerFactory()
: getProducerFactory(null,
new ExtendedProducerProperties<>(dlqProducerProperties));
final KafkaTemplate<?, ?> kafkaTemplate = new KafkaTemplate<>(
@@ -1058,7 +1079,7 @@ public class KafkaMessageChannelBinder extends
if (properties.isUseNativeDecoding()) {
if (record != null) {
Map<String, String> configuration = this.transactionManager == null
Map<String, String> configuration = transMan == null
? dlqProducerProperties.getConfiguration()
: this.configurationProperties.getTransaction()
.getProducer().getConfiguration();
@@ -1156,22 +1177,46 @@ public class KafkaMessageChannelBinder extends
String dlqName = StringUtils.hasText(kafkaConsumerProperties.getDlqName())
? kafkaConsumerProperties.getDlqName()
: "error." + record.topic() + "." + group;
MessageHeaders headers;
if (message instanceof ErrorMessage) {
final ErrorMessage errorMessage = (ErrorMessage) message;
final Message<?> originalMessage = errorMessage.getOriginalMessage();
if (originalMessage != null) {
headers = originalMessage.getHeaders();
}
else {
headers = message.getHeaders();
}
}
else {
headers = message.getHeaders();
}
if (this.transactionTemplate != null) {
Throwable throwable2 = throwable;
this.transactionTemplate.executeWithoutResult(status -> {
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable2,
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()),
headers, this.ackModeInfo.get(destination));
});
}
else {
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable,
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()), headers, this.ackModeInfo.get(destination));
}
};
}
return null;
}
@SuppressWarnings("unchecked")
@Nullable
private KafkaAwareTransactionManager<byte[], byte[]> transactionManager(@Nullable String beanName) {
if (StringUtils.hasText(beanName)) {
return getApplicationContext().getBean(beanName, KafkaAwareTransactionManager.class);
}
return this.transactionManager;
}
private DlqPartitionFunction determinDlqPartitionFunction(Integer dlqPartitions) {
if (this.dlqPartitionFunction != null) {
return this.dlqPartitionFunction;
@@ -1428,7 +1473,8 @@ public class KafkaMessageChannelBinder extends
@SuppressWarnings("unchecked")
void sendToDlq(ConsumerRecord<?, ?> consumerRecord, Headers headers,
String dlqName, String group, Throwable throwable, DlqPartitionFunction partitionFunction) {
String dlqName, String group, Throwable throwable, DlqPartitionFunction partitionFunction,
MessageHeaders messageHeaders, ContainerProperties.AckMode ackMode) {
K key = (K) consumerRecord.key();
V value = (V) consumerRecord.value();
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(dlqName,
@@ -1458,6 +1504,9 @@ public class KafkaMessageChannelBinder extends
KafkaMessageChannelBinder.this.logger
.debug("Sent to DLQ " + sb.toString());
}
if (ackMode == ContainerProperties.AckMode.MANUAL || ackMode == ContainerProperties.AckMode.MANUAL_IMMEDIATE) {
messageHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge();
}
}
});
}

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -51,7 +52,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.TimestampType;
@@ -1105,75 +1108,6 @@ public class KafkaBinderTests extends
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testDefaultAutoCommitOnErrorWithoutDlq() throws Exception {
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
BindingProperties producerBindingProperties = createProducerBindingProperties(
producerProperties);
DirectChannel moduleOutputChannel = createBindableChannel("output",
producerBindingProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(1);
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
DirectChannel moduleInputChannel = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
moduleInputChannel.subscribe(handler);
long uniqueBindingId = System.currentTimeMillis();
Binding<MessageChannel> producerBinding = binder.bindProducer(
"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
producerProperties);
Binding<MessageChannel> consumerBinding = binder.bindConsumer(
"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
consumerProperties);
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<byte[]> testMessage = MessageBuilder
.withPayload(testMessagePayload.getBytes()).build();
moduleOutputChannel.send(testMessage);
assertThat(handler.getLatch().await((int) (timeoutMultiplier * 1000),
TimeUnit.MILLISECONDS));
// first attempt fails
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
Message<?> receivedMessage = handler.getReceivedMessages().entrySet().iterator()
.next().getValue();
assertThat(receivedMessage).isNotNull();
assertThat(
new String((byte[]) receivedMessage.getPayload(), StandardCharsets.UTF_8))
.isEqualTo(testMessagePayload);
assertThat(handler.getInvocationCount())
.isEqualTo(consumerProperties.getMaxAttempts());
consumerBinding.unbind();
// on the second attempt the message is redelivered
QueueChannel successfulInputChannel = new QueueChannel();
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
"testGroup", successfulInputChannel, consumerProperties);
binderBindUnbindLatency();
String testMessage2Payload = "test." + UUID.randomUUID().toString();
Message<byte[]> testMessage2 = MessageBuilder
.withPayload(testMessage2Payload.getBytes()).build();
moduleOutputChannel.send(testMessage2);
Message<?> firstReceived = receive(successfulInputChannel);
assertThat(firstReceived.getPayload()).isEqualTo(testMessagePayload.getBytes());
Message<?> secondReceived = receive(successfulInputChannel);
assertThat(secondReceived.getPayload()).isEqualTo(testMessage2Payload.getBytes());
consumerBinding.unbind();
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testDefaultAutoCommitOnErrorWithDlq() throws Exception {
@@ -1190,7 +1124,6 @@ public class KafkaBinderTests extends
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
consumerProperties.getExtension().setEnableDlq(true);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
DirectChannel moduleInputChannel = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
@@ -1252,6 +1185,87 @@ public class KafkaBinderTests extends
producerBinding.unbind();
}
//See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/870 for motivation for this test.
@Test
@SuppressWarnings("unchecked")
public void testAutoCommitOnErrorWhenManualAcknowledgement() throws Exception {
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
BindingProperties producerBindingProperties = createProducerBindingProperties(
producerProperties);
DirectChannel moduleOutputChannel = createBindableChannel("output",
producerBindingProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(3);
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
//When auto commit is disabled, then the record is committed after publishing to DLQ using the manual acknowledgement.
// (if DLQ is enabled, which is, in this case).
consumerProperties.getExtension().setAutoCommitOffset(false);
consumerProperties.getExtension().setEnableDlq(true);
DirectChannel moduleInputChannel = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
moduleInputChannel.subscribe(handler);
long uniqueBindingId = System.currentTimeMillis();
Binding<MessageChannel> producerBinding = binder.bindProducer(
"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
producerProperties);
Binding<MessageChannel> consumerBinding = binder.bindConsumer(
"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
consumerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
dlqConsumerProperties.setMaxAttempts(1);
QueueChannel dlqChannel = new QueueChannel();
Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
"error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel,
dlqConsumerProperties);
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<byte[]> testMessage = MessageBuilder
.withPayload(testMessagePayload.getBytes()).build();
moduleOutputChannel.send(testMessage);
Message<?> dlqMessage = receive(dlqChannel, 3);
assertThat(dlqMessage).isNotNull();
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
// first attempt fails
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator()
.next().getValue();
assertThat(handledMessage).isNotNull();
assertThat(
new String((byte[]) handledMessage.getPayload(), StandardCharsets.UTF_8))
.isEqualTo(testMessagePayload);
assertThat(handler.getInvocationCount())
.isEqualTo(consumerProperties.getMaxAttempts());
binderBindUnbindLatency();
dlqConsumerBinding.unbind();
consumerBinding.unbind();
// on the second attempt the message is not redelivered because the DLQ is set and the record in error is already committed.
QueueChannel successfulInputChannel = new QueueChannel();
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
"testGroup", successfulInputChannel, consumerProperties);
String testMessage2Payload = "test1." + UUID.randomUUID().toString();
Message<byte[]> testMessage2 = MessageBuilder
.withPayload(testMessage2Payload.getBytes()).build();
moduleOutputChannel.send(testMessage2);
Message<?> receivedMessage = receive(successfulInputChannel);
assertThat(receivedMessage.getPayload())
.isEqualTo(testMessage2Payload.getBytes());
binderBindUnbindLatency();
consumerBinding.unbind();
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testConfigurableDlqName() throws Exception {
@@ -1727,7 +1741,7 @@ public class KafkaBinderTests extends
}
catch (UnsupportedOperationException ignored) {
}
List<ChannelInterceptor> interceptors = output.getChannelInterceptors();
List<ChannelInterceptor> interceptors = output.getInterceptors();
AtomicInteger count = new AtomicInteger();
interceptors.forEach(interceptor -> {
if (interceptor instanceof PartitioningInterceptor) {
@@ -3508,6 +3522,98 @@ public class KafkaBinderTests extends
}
}
@Test
public void testInternalHeadersNotPropagated() throws Exception {
testInternalHeadersNotPropagatedGuts("propagate.1", null, null);
}
@Test
public void testInternalHeadersNotPropagatedCustomHeader() throws Exception {
testInternalHeadersNotPropagatedGuts("propagate.2", new String[] { "foo", "*" }, null);
}
@Test
public void testInternalHeadersNotPropagatedCustomMapper() throws Exception {
testInternalHeadersNotPropagatedGuts("propagate.3", null, new BinderHeaderMapper("*"));
}
public void testInternalHeadersNotPropagatedGuts(String name, String[] headerPatterns,
KafkaHeaderMapper mapper) throws Exception {
KafkaTestBinder binder;
if (mapper == null) {
binder = getBinder();
}
else {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setHeaderMapperBeanName("headerMapper");
KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
catch (Exception e) {
throw new RuntimeException(e);
}
binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner);
((GenericApplicationContext) binder.getApplicationContext()).registerBean("headerMapper",
KafkaHeaderMapper.class, () -> mapper);
}
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.getExtension().setHeaderPatterns(headerPatterns);
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties));
output.setBeanName(name + ".out");
Binding<MessageChannel> producerBinding = binder.bindProducer(name + ".1", output, producerProperties);
QueueChannel input = new QueueChannel();
input.setBeanName(name + ".in");
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<MessageChannel> consumerBinding = binder.bindConsumer(name + ".0", name, input, consumerProperties);
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
KafkaTemplate template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps));
template.send(MessageBuilder.withPayload("internalHeaderPropagation")
.setHeader(KafkaHeaders.TOPIC, name + ".0")
.setHeader("someHeader", "someValue")
.build());
Message<?> consumed = input.receive(10_000);
if (headerPatterns != null) {
consumed = MessageBuilder.fromMessage(consumed).setHeader(headerPatterns[0], "bar").build();
}
output.send(consumed);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(name, "false",
embeddedKafka.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer consumer = cf.createConsumer();
consumer.assign(Collections.singletonList(new TopicPartition(name + ".1", 0)));
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(10));
assertThat(records.count()).isEqualTo(1);
ConsumerRecord<?, ?> received = records.iterator().next();
assertThat(received.value()).isEqualTo("internalHeaderPropagation".getBytes());
Header header = received.headers().lastHeader(BinderHeaders.NATIVE_HEADERS_PRESENT);
assertThat(header).isNull();
header = received.headers().lastHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
assertThat(header).isNull();
header = received.headers().lastHeader(MessageHeaders.ID);
assertThat(header).isNull();
header = received.headers().lastHeader(MessageHeaders.TIMESTAMP);
assertThat(header).isNull();
assertThat(received.headers().lastHeader("someHeader")).isNotNull();
if (headerPatterns != null) {
assertThat(received.headers().lastHeader(headerPatterns[0])).isNotNull();
}
producerBinding.unbind();
consumerBinding.unbind();
consumer.close();
}
private final class FailingInvocationCountingMessageHandler
implements MessageHandler {

View File

@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.integration2;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -33,22 +34,30 @@ import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.backoff.FixedBackOff;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
/**
* @author Gary Russell
@@ -63,6 +72,8 @@ import static org.assertj.core.api.Assertions.assertThat;
"spring.cloud.stream.bindings.input.destination=consumer.producer.txIn",
"spring.cloud.stream.bindings.input.group=consumer.producer.tx",
"spring.cloud.stream.bindings.input.consumer.max-attempts=1",
"spring.cloud.stream.kafka.bindings.input2.consumer.transaction-manager=tm",
"spring.cloud.stream.kafka.bindings.output2.producer.transaction-manager=tm",
"spring.cloud.stream.bindings.output.destination=consumer.producer.txOut",
"spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.",
"spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=99",
@@ -100,7 +111,17 @@ public class ConsumerProducerTransactionTests {
assertThat(this.config.outs).containsExactlyInAnyOrder("ONE", "THREE");
}
@EnableBinding(Processor.class)
@Test
public void externalTM() {
assertThat(this.config.input2Container.getContainerProperties().getTransactionManager())
.isSameAs(this.config.tm);
Object handler = KafkaTestUtils.getPropertyValue(this.config.output2, "dispatcher.handlers", Set.class)
.iterator().next();
assertThat(KafkaTestUtils.getPropertyValue(handler, "delegate.kafkaTemplate.producerFactory"))
.isSameAs(this.config.pf);
}
@EnableBinding(TwoProcessors.class)
@EnableAutoConfiguration
public static class Config {
@@ -111,6 +132,15 @@ public class ConsumerProducerTransactionTests {
@Autowired
private MessageChannel output;
@Autowired
MessageChannel output2;
AbstractMessageListenerContainer<?, ?> input2Container;
ProducerFactory pf;
KafkaAwareTransactionManager<byte[], byte[]> tm;
@KafkaListener(id = "test.cons.prod", topics = "consumer.producer.txOut")
public void listenOut(String in) {
this.outs.add(in);
@@ -125,6 +155,10 @@ public class ConsumerProducerTransactionTests {
}
}
@StreamListener("input2")
public void listenIn2(String in) {
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
@@ -136,10 +170,34 @@ public class ConsumerProducerTransactionTests {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, dest, group) -> container
.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, 1L)));
return (container, dest, group) -> {
container.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(0L, 1L)));
if ("input2".equals(dest)) {
this.input2Container = container;
}
};
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public KafkaAwareTransactionManager<byte[], byte[]> tm(ProducerFactory pf) {
KafkaAwareTransactionManager mock = mock(KafkaAwareTransactionManager.class);
this.pf = pf;
given(mock.getProducerFactory()).willReturn(pf);
this.tm = mock;
return mock;
}
}
public interface TwoProcessors extends Processor {
@Input
SubscribableChannel input2();
@Output
MessageChannel output2();
}
}