Compare commits
33 Commits
v1.1.0.RC1
...
v1.1.1.REL
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b29498617d | ||
|
|
547ea77b2f | ||
|
|
699dc06bc5 | ||
|
|
e822e59859 | ||
|
|
cc630f2be0 | ||
|
|
1668f0c694 | ||
|
|
36c90aa8c0 | ||
|
|
943a7ae148 | ||
|
|
a1633ab241 | ||
|
|
68fd3fb6d7 | ||
|
|
df6d472e99 | ||
|
|
23ae2b3088 | ||
|
|
2e595f34f2 | ||
|
|
acb1c60216 | ||
|
|
275f5cbb8e | ||
|
|
79643586dd | ||
|
|
b292f81d46 | ||
|
|
8ad98c09b9 | ||
|
|
624b039da5 | ||
|
|
b08888862a | ||
|
|
e13e1b1b1b | ||
|
|
513c27beb8 | ||
|
|
5f1e6caeab | ||
|
|
ed4416e7c7 | ||
|
|
546fe69346 | ||
|
|
30c868e51b | ||
|
|
6506018a2c | ||
|
|
09ec23b4bc | ||
|
|
8f4fbd0da6 | ||
|
|
323206130a | ||
|
|
b944b594b5 | ||
|
|
4fa567fec7 | ||
|
|
16d6584774 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -18,6 +18,7 @@ _site/
|
||||
*.ipr
|
||||
*.iws
|
||||
.idea/*
|
||||
*/.idea
|
||||
.factorypath
|
||||
dump.rdb
|
||||
.apt_generated
|
||||
|
||||
@@ -1,3 +1 @@
|
||||
# spring-cloud-stream-binder-kafka
|
||||
|
||||
Spring Cloud Stream Binder implementation for Kafka
|
||||
Spring Cloud Stream Binder for Apache Kafka
|
||||
|
||||
25
mvnw
vendored
25
mvnw
vendored
@@ -57,27 +57,27 @@ case "`uname`" in
|
||||
#
|
||||
# Look for the Apple JDKs first to preserve the existing behaviour, and then look
|
||||
# for the new JDKs provided by Oracle.
|
||||
#
|
||||
#
|
||||
if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK ] ; then
|
||||
#
|
||||
# Apple JDKs
|
||||
#
|
||||
export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home
|
||||
fi
|
||||
|
||||
|
||||
if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Java/JavaVirtualMachines/CurrentJDK ] ; then
|
||||
#
|
||||
# Apple JDKs
|
||||
#
|
||||
export JAVA_HOME=/System/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home
|
||||
fi
|
||||
|
||||
|
||||
if [ -z "$JAVA_HOME" ] && [ -L "/Library/Java/JavaVirtualMachines/CurrentJDK" ] ; then
|
||||
#
|
||||
# Oracle JDKs
|
||||
#
|
||||
export JAVA_HOME=/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -z "$JAVA_HOME" ] && [ -x "/usr/libexec/java_home" ]; then
|
||||
#
|
||||
@@ -219,16 +219,27 @@ concat_lines() {
|
||||
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-$(find_maven_basedir)}
|
||||
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
|
||||
|
||||
# Provide a "standardized" way to retrieve the CLI args that will
|
||||
# Provide a "standardized" way to retrieve the CLI args that will
|
||||
# work with both Windows and non-Windows executions.
|
||||
MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
|
||||
export MAVEN_CMD_LINE_ARGS
|
||||
|
||||
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
echo "Running version check"
|
||||
VERSION=$( sed '\!<parent!,\!</parent!d' `dirname $0`/pom.xml | grep '<version' | head -1 | sed -e 's/.*<version>//' -e 's!</version>.*$!!' )
|
||||
echo "The found version is [${VERSION}]"
|
||||
|
||||
if echo $VERSION | egrep -q 'M|RC'; then
|
||||
echo Activating \"milestone\" profile for version=\"$VERSION\"
|
||||
echo $MAVEN_ARGS | grep -q milestone || MAVEN_ARGS="$MAVEN_ARGS -Pmilestone"
|
||||
else
|
||||
echo Deactivating \"milestone\" profile for version=\"$VERSION\"
|
||||
echo $MAVEN_ARGS | grep -q milestone && MAVEN_ARGS=$(echo $MAVEN_ARGS | sed -e 's/-Pmilestone//')
|
||||
fi
|
||||
|
||||
exec "$JAVACMD" \
|
||||
$MAVEN_OPTS \
|
||||
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
|
||||
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
|
||||
${WRAPPER_LAUNCHER} "$@"
|
||||
|
||||
${WRAPPER_LAUNCHER} ${MAVEN_ARGS} "$@"
|
||||
|
||||
321
mvnw.cmd
vendored
321
mvnw.cmd
vendored
@@ -1,234 +1,145 @@
|
||||
#!/bin/sh
|
||||
# ----------------------------------------------------------------------------
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
# ----------------------------------------------------------------------------
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Licensed to the Apache Software Foundation (ASF) under one
|
||||
@REM or more contributor license agreements. See the NOTICE file
|
||||
@REM distributed with this work for additional information
|
||||
@REM regarding copyright ownership. The ASF licenses this file
|
||||
@REM to you under the Apache License, Version 2.0 (the
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
@REM KIND, either express or implied. See the License for the
|
||||
@REM specific language governing permissions and limitations
|
||||
@REM under the License.
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Maven2 Start Up Batch script
|
||||
#
|
||||
# Required ENV vars:
|
||||
# ------------------
|
||||
# JAVA_HOME - location of a JDK home dir
|
||||
#
|
||||
# Optional ENV vars
|
||||
# -----------------
|
||||
# M2_HOME - location of maven2's installed home dir
|
||||
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
|
||||
# e.g. to debug Maven itself, use
|
||||
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
|
||||
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
|
||||
# ----------------------------------------------------------------------------
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Maven2 Start Up Batch script
|
||||
@REM
|
||||
@REM Required ENV vars:
|
||||
@REM JAVA_HOME - location of a JDK home dir
|
||||
@REM
|
||||
@REM Optional ENV vars
|
||||
@REM M2_HOME - location of maven2's installed home dir
|
||||
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
|
||||
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
|
||||
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
|
||||
@REM e.g. to debug Maven itself, use
|
||||
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
|
||||
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
if [ -z "$MAVEN_SKIP_RC" ] ; then
|
||||
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
|
||||
@echo off
|
||||
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
|
||||
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
|
||||
|
||||
if [ -f /etc/mavenrc ] ; then
|
||||
. /etc/mavenrc
|
||||
fi
|
||||
@REM set %HOME% to equivalent of $HOME
|
||||
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
|
||||
|
||||
if [ -f "$HOME/.mavenrc" ] ; then
|
||||
. "$HOME/.mavenrc"
|
||||
fi
|
||||
@REM Execute a user defined script before this one
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
|
||||
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
|
||||
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
|
||||
:skipRcPre
|
||||
|
||||
fi
|
||||
@setlocal
|
||||
|
||||
# OS specific support. $var _must_ be set to either true or false.
|
||||
cygwin=false;
|
||||
darwin=false;
|
||||
mingw=false
|
||||
case "`uname`" in
|
||||
CYGWIN*) cygwin=true ;;
|
||||
MINGW*) mingw=true;;
|
||||
Darwin*) darwin=true
|
||||
#
|
||||
# Look for the Apple JDKs first to preserve the existing behaviour, and then look
|
||||
# for the new JDKs provided by Oracle.
|
||||
#
|
||||
if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK ] ; then
|
||||
#
|
||||
# Apple JDKs
|
||||
#
|
||||
export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home
|
||||
fi
|
||||
|
||||
if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Java/JavaVirtualMachines/CurrentJDK ] ; then
|
||||
#
|
||||
# Apple JDKs
|
||||
#
|
||||
export JAVA_HOME=/System/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home
|
||||
fi
|
||||
|
||||
if [ -z "$JAVA_HOME" ] && [ -L "/Library/Java/JavaVirtualMachines/CurrentJDK" ] ; then
|
||||
#
|
||||
# Oracle JDKs
|
||||
#
|
||||
export JAVA_HOME=/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home
|
||||
fi
|
||||
set ERROR_CODE=0
|
||||
|
||||
if [ -z "$JAVA_HOME" ] && [ -x "/usr/libexec/java_home" ]; then
|
||||
#
|
||||
# Apple JDKs
|
||||
#
|
||||
export JAVA_HOME=`/usr/libexec/java_home`
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
@REM To isolate internal variables from possible post scripts, we use another setlocal
|
||||
@setlocal
|
||||
|
||||
if [ -z "$JAVA_HOME" ] ; then
|
||||
if [ -r /etc/gentoo-release ] ; then
|
||||
JAVA_HOME=`java-config --jre-home`
|
||||
fi
|
||||
fi
|
||||
@REM ==== START VALIDATION ====
|
||||
if not "%JAVA_HOME%" == "" goto OkJHome
|
||||
|
||||
if [ -z "$M2_HOME" ] ; then
|
||||
## resolve links - $0 may be a link to maven's home
|
||||
PRG="$0"
|
||||
echo.
|
||||
echo Error: JAVA_HOME not found in your environment. >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
# need this for relative symlinks
|
||||
while [ -h "$PRG" ] ; do
|
||||
ls=`ls -ld "$PRG"`
|
||||
link=`expr "$ls" : '.*-> \(.*\)$'`
|
||||
if expr "$link" : '/.*' > /dev/null; then
|
||||
PRG="$link"
|
||||
else
|
||||
PRG="`dirname "$PRG"`/$link"
|
||||
fi
|
||||
done
|
||||
:OkJHome
|
||||
if exist "%JAVA_HOME%\bin\java.exe" goto init
|
||||
|
||||
saveddir=`pwd`
|
||||
echo.
|
||||
echo Error: JAVA_HOME is set to an invalid directory. >&2
|
||||
echo JAVA_HOME = "%JAVA_HOME%" >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
M2_HOME=`dirname "$PRG"`/..
|
||||
@REM ==== END VALIDATION ====
|
||||
|
||||
# make it fully qualified
|
||||
M2_HOME=`cd "$M2_HOME" && pwd`
|
||||
:init
|
||||
|
||||
cd "$saveddir"
|
||||
# echo Using m2 at $M2_HOME
|
||||
fi
|
||||
set MAVEN_CMD_LINE_ARGS=%*
|
||||
|
||||
# For Cygwin, ensure paths are in UNIX format before anything is touched
|
||||
if $cygwin ; then
|
||||
[ -n "$M2_HOME" ] &&
|
||||
M2_HOME=`cygpath --unix "$M2_HOME"`
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
|
||||
[ -n "$CLASSPATH" ] &&
|
||||
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
|
||||
fi
|
||||
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
|
||||
@REM Fallback to current working directory if not found.
|
||||
|
||||
# For Migwn, ensure paths are in UNIX format before anything is touched
|
||||
if $mingw ; then
|
||||
[ -n "$M2_HOME" ] &&
|
||||
M2_HOME="`(cd "$M2_HOME"; pwd)`"
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
|
||||
# TODO classpath?
|
||||
fi
|
||||
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
|
||||
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
|
||||
|
||||
if [ -z "$JAVA_HOME" ]; then
|
||||
javaExecutable="`which javac`"
|
||||
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
|
||||
# readlink(1) is not available as standard on Solaris 10.
|
||||
readLink=`which readlink`
|
||||
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
|
||||
if $darwin ; then
|
||||
javaHome="`dirname \"$javaExecutable\"`"
|
||||
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
|
||||
else
|
||||
javaExecutable="`readlink -f \"$javaExecutable\"`"
|
||||
fi
|
||||
javaHome="`dirname \"$javaExecutable\"`"
|
||||
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
|
||||
JAVA_HOME="$javaHome"
|
||||
export JAVA_HOME
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
set EXEC_DIR=%CD%
|
||||
set WDIR=%EXEC_DIR%
|
||||
:findBaseDir
|
||||
IF EXIST "%WDIR%"\.mvn goto baseDirFound
|
||||
cd ..
|
||||
IF "%WDIR%"=="%CD%" goto baseDirNotFound
|
||||
set WDIR=%CD%
|
||||
goto findBaseDir
|
||||
|
||||
if [ -z "$JAVACMD" ] ; then
|
||||
if [ -n "$JAVA_HOME" ] ; then
|
||||
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
|
||||
# IBM's JDK on AIX uses strange locations for the executables
|
||||
JAVACMD="$JAVA_HOME/jre/sh/java"
|
||||
else
|
||||
JAVACMD="$JAVA_HOME/bin/java"
|
||||
fi
|
||||
else
|
||||
JAVACMD="`which java`"
|
||||
fi
|
||||
fi
|
||||
:baseDirFound
|
||||
set MAVEN_PROJECTBASEDIR=%WDIR%
|
||||
cd "%EXEC_DIR%"
|
||||
goto endDetectBaseDir
|
||||
|
||||
if [ ! -x "$JAVACMD" ] ; then
|
||||
echo "Error: JAVA_HOME is not defined correctly." >&2
|
||||
echo " We cannot execute $JAVACMD" >&2
|
||||
exit 1
|
||||
fi
|
||||
:baseDirNotFound
|
||||
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
|
||||
cd "%EXEC_DIR%"
|
||||
|
||||
if [ -z "$JAVA_HOME" ] ; then
|
||||
echo "Warning: JAVA_HOME environment variable is not set."
|
||||
fi
|
||||
:endDetectBaseDir
|
||||
|
||||
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
|
||||
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
|
||||
|
||||
# For Cygwin, switch paths to Windows format before running java
|
||||
if $cygwin; then
|
||||
[ -n "$M2_HOME" ] &&
|
||||
M2_HOME=`cygpath --path --windows "$M2_HOME"`
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
|
||||
[ -n "$CLASSPATH" ] &&
|
||||
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
|
||||
fi
|
||||
@setlocal EnableExtensions EnableDelayedExpansion
|
||||
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
|
||||
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
|
||||
|
||||
# traverses directory structure from process work directory to filesystem root
|
||||
# first directory with .mvn subdirectory is considered project base directory
|
||||
find_maven_basedir() {
|
||||
local basedir=$(pwd)
|
||||
local wdir=$(pwd)
|
||||
while [ "$wdir" != '/' ] ; do
|
||||
if [ -d "$wdir"/.mvn ] ; then
|
||||
basedir=$wdir
|
||||
break
|
||||
fi
|
||||
wdir=$(cd "$wdir/.."; pwd)
|
||||
done
|
||||
echo "${basedir}"
|
||||
}
|
||||
:endReadAdditionalConfig
|
||||
|
||||
# concatenates all lines of a file
|
||||
concat_lines() {
|
||||
if [ -f "$1" ]; then
|
||||
echo "$(tr -s '\n' ' ' < "$1")"
|
||||
fi
|
||||
}
|
||||
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
|
||||
|
||||
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-$(find_maven_basedir)}
|
||||
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
|
||||
set WRAPPER_JAR="".\.mvn\wrapper\maven-wrapper.jar""
|
||||
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
# Provide a "standardized" way to retrieve the CLI args that will
|
||||
# work with both Windows and non-Windows executions.
|
||||
MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
|
||||
export MAVEN_CMD_LINE_ARGS
|
||||
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CMD_LINE_ARGS%
|
||||
if ERRORLEVEL 1 goto error
|
||||
goto end
|
||||
|
||||
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
:error
|
||||
set ERROR_CODE=1
|
||||
|
||||
exec "$JAVACMD" \
|
||||
$MAVEN_OPTS \
|
||||
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
|
||||
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
|
||||
${WRAPPER_LAUNCHER} "$@"
|
||||
:end
|
||||
@endlocal & set ERROR_CODE=%ERROR_CODE%
|
||||
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
|
||||
@REM check for post script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
|
||||
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
|
||||
:skipRcPost
|
||||
|
||||
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
|
||||
if "%MAVEN_BATCH_PAUSE%" == "on" pause
|
||||
|
||||
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
|
||||
|
||||
exit /B %ERROR_CODE%
|
||||
|
||||
130
pom.xml
130
pom.xml
@@ -2,19 +2,20 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.RC1</version>
|
||||
<version>1.1.1.RELEASE</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-parent</artifactId>
|
||||
<version>1.1.0.RC1</version>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>1.2.1.RELEASE</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.7</java.version>
|
||||
<kafka.version>0.9.0.1</kafka.version>
|
||||
<spring-kafka.version>1.0.3.RELEASE</spring-kafka.version>
|
||||
<spring-kafka.version>1.0.5.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.0.1.RELEASE</spring-integration-kafka.version>
|
||||
<spring-cloud-stream.version>1.1.1.RELEASE</spring-cloud-stream.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
@@ -22,46 +23,19 @@
|
||||
<module>spring-cloud-stream-binder-kafka-docs</module>
|
||||
<module>spring-cloud-stream-binder-kafka-0.10-test</module>
|
||||
</modules>
|
||||
<build>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<redirectTestOutputToFile>true</redirectTestOutputToFile>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<version>1.7</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<version>2.17</version>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.puppycrawl.tools</groupId>
|
||||
<artifactId>checkstyle</artifactId>
|
||||
<version>6.17</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
<configuration>
|
||||
<quiet>true</quiet>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-codec</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
@@ -92,6 +66,12 @@
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>${spring-integration-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
@@ -106,6 +86,74 @@
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<build>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<version>1.7</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<version>2.17</version>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.puppycrawl.tools</groupId>
|
||||
<artifactId>checkstyle</artifactId>
|
||||
<version>7.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
<configuration>
|
||||
<quiet>true</quiet>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<redirectTestOutputToFile>true</redirectTestOutputToFile>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build-tools</artifactId>
|
||||
<version>1.2.0.RELEASE</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>checkstyle-validation</id>
|
||||
<phase>validate</phase>
|
||||
<configuration>
|
||||
<configLocation>checkstyle.xml</configLocation>
|
||||
<encoding>UTF-8</encoding>
|
||||
<consoleOutput>true</consoleOutput>
|
||||
<failsOnError>true</failsOnError>
|
||||
<includeTestSourceDirectory>true</includeTestSourceDirectory>
|
||||
</configuration>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>spring</id>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.RC1</version>
|
||||
<version>1.1.1.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
@@ -20,7 +20,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>1.1.0.RC1</version>
|
||||
<version>1.1.1.RELEASE</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.RC1</version>
|
||||
<version>1.1.1.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-0.10-test</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder 0.10 Tests</description>
|
||||
@@ -20,15 +20,15 @@
|
||||
Spring Integration Kafka versions
|
||||
-->
|
||||
<kafka.version>0.10.0.0</kafka.version>
|
||||
<spring-kafka.version>1.1.0.M1</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.0.1.RELEASE</spring-integration-kafka.version>
|
||||
<spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.1.0.RELEASE</spring-integration-kafka.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>1.1.0.RC1</version>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
@@ -64,7 +64,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>1.1.0.RC1</version>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
@@ -73,7 +73,32 @@
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-schema</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-avro-serializer</artifactId>
|
||||
<version>3.0.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-schema-registry</artifactId>
|
||||
<version>3.0.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>confluent</id>
|
||||
<url>http://packages.confluent.io/maven/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
|
||||
</project>
|
||||
|
||||
@@ -21,29 +21,45 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
|
||||
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.Binding;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.Spy;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.test.core.BrokerAddress;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Integration tests for the {@link KafkaMessageChannelBinder}.
|
||||
*
|
||||
@@ -120,16 +136,6 @@ public class Kafka10BinderTests extends KafkaBinderTests {
|
||||
return adminUtilsOperation.partitionSize(topic, zkUtils);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
|
||||
return new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
|
||||
return new ExtendedProducerProperties<>(new KafkaProducerProperties());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKafkaOffsetHeaderKey() {
|
||||
return KafkaHeaders.OFFSET;
|
||||
@@ -176,4 +182,64 @@ public class Kafka10BinderTests extends KafkaBinderTests {
|
||||
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCustomAvroSerialization() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
Map<String, Object> schemaRegistryProps = new HashMap<>();
|
||||
schemaRegistryProps.put("kafkastore.connection.url", configurationProperties.getZkConnectionString());
|
||||
schemaRegistryProps.put("listeners", "http://0.0.0.0:8082");
|
||||
schemaRegistryProps.put("port", "8082");
|
||||
schemaRegistryProps.put("kafkastore.topic", "_schemas");
|
||||
SchemaRegistryConfig config = new SchemaRegistryConfig(schemaRegistryProps);
|
||||
SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
|
||||
Server server = app.createServer();
|
||||
server.start();
|
||||
long endTime = System.currentTimeMillis() + 5000;
|
||||
while(true) {
|
||||
if (server.isRunning()) {
|
||||
break;
|
||||
}
|
||||
else if (System.currentTimeMillis() > endTime) {
|
||||
fail("Kafka Schema Registry Server failed to start");
|
||||
}
|
||||
}
|
||||
User1 firstOutboundFoo = new User1();
|
||||
String userName1 = "foo-name" + UUID.randomUUID().toString();
|
||||
String favColor1 = "foo-color" + UUID.randomUUID().toString();
|
||||
firstOutboundFoo.setName(userName1);
|
||||
firstOutboundFoo.setFavoriteColor(favColor1);
|
||||
Message<?> message = MessageBuilder.withPayload(firstOutboundFoo).build();
|
||||
SubscribableChannel moduleOutputChannel = new DirectChannel();
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.getExtension().getConfiguration().put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||
producerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
|
||||
producerProperties.setUseNativeEncoding(true);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
consumerProperties.getExtension().getConfiguration().put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||
consumerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertTrue(message.getPayload() instanceof User1);
|
||||
User1 receivedUser = (User1) message.getPayload();
|
||||
assertThat(receivedUser.getName()).isEqualTo(userName1);
|
||||
assertThat(receivedUser.getFavoriteColor()).isEqualTo(favColor1);
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.reflect.Nullable;
|
||||
import org.apache.avro.specific.SpecificRecordBase;
|
||||
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class User1 extends SpecificRecordBase {
|
||||
|
||||
@Nullable
|
||||
private String name;
|
||||
|
||||
@Nullable
|
||||
private String favoriteColor;
|
||||
|
||||
public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getFavoriteColor() {
|
||||
return this.favoriteColor;
|
||||
}
|
||||
|
||||
public void setFavoriteColor(String favoriteColor) {
|
||||
this.favoriteColor = favoriteColor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Schema getSchema() {
|
||||
try {
|
||||
return new Schema.Parser().parse(new ClassPathResource("schemas/users_v1.schema").getInputStream());
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(int i) {
|
||||
if (i == 0) {
|
||||
return getName().toString();
|
||||
}
|
||||
if (i == 1) {
|
||||
return getFavoriteColor().toString();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(int i, Object o) {
|
||||
if (i == 0) {
|
||||
setName((String) o);
|
||||
}
|
||||
if (i == 1) {
|
||||
setFavoriteColor((String) o);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
{"namespace": "org.springframework.cloud.stream.binder.kafka",
|
||||
"type": "record",
|
||||
"name": "User1",
|
||||
"fields": [
|
||||
{"name": "name", "type": "string"},
|
||||
{"name": "favoriteColor", "type": "string"}
|
||||
]
|
||||
}
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.RC1</version>
|
||||
<version>1.1.1.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
|
||||
@@ -18,7 +18,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>1.1.0.RC1</version>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<profiles>
|
||||
|
||||
@@ -129,7 +129,10 @@ The property `spring.cloud.stream.instanceCount` must typically be greater than
|
||||
Default: `true`.
|
||||
autoCommitOffset::
|
||||
Whether to autocommit offsets when a message has been processed.
|
||||
If set to `false`, an `Acknowledgment` header will be available in the message headers for late acknowledgment.
|
||||
If set to `false`, a header with the key `kafka_acknowledgment` of the type `org.springframework.kafka.support.Acknowledgment` header will be present in the inbound message.
|
||||
Applications may use this header for acknowledging messages.
|
||||
See the examples section for details.
|
||||
When this property is set to `false`, Kafka binder will set the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL`.
|
||||
+
|
||||
Default: `true`.
|
||||
autoCommitOnError::
|
||||
@@ -199,6 +202,34 @@ If a topic already exists with a larger number of partitions than the maximum of
|
||||
|
||||
In this section, we illustrate the use of the above properties for specific scenarios.
|
||||
|
||||
==== Example: Setting `autoCommitOffset` false and relying on manual acking.
|
||||
|
||||
This example illustrates how one may manually acknowledge offsets in a consumer application.
|
||||
|
||||
This example requires that `spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset` is set to false.
|
||||
Use the corresponding input channel name for your example.
|
||||
|
||||
[source]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Sink.class)
|
||||
public class ManuallyAcknowdledgingConsumer {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
|
||||
}
|
||||
|
||||
@StreamListener(Sink.INPUT)
|
||||
public void process(Message<?> message) {
|
||||
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
|
||||
if (acknowledgment != null) {
|
||||
System.out.println("Acknowledgment provided");
|
||||
acknowledgment.acknowledge();
|
||||
}
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
==== Example: security configuration
|
||||
|
||||
Apache Kafka 0.9 supports secure connections between client and brokers.
|
||||
@@ -215,18 +246,78 @@ spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
|
||||
All the other security properties can be set in a similar manner.
|
||||
|
||||
When using Kerberos, follow the instructions in the http://kafka.apache.org/090/documentation.html#security_sasl_clientconfig[reference documentation] for creating and referencing the JAAS configuration.
|
||||
At the time of this release, the JAAS, and (optionally) krb5 file locations must be set for Spring Cloud Stream applications by using system properties.
|
||||
Here is an example of launching a Spring Cloud Stream application with SASL and Kerberos.
|
||||
|
||||
Spring Cloud Stream supports passing JAAS configuration information to the application using a JAAS configuration file and using Spring Boot properties.
|
||||
|
||||
===== Using JAAS configuration files
|
||||
|
||||
The JAAS, and (optionally) krb5 file locations can be set for Spring Cloud Stream applications by using system properties.
|
||||
Here is an example of launching a Spring Cloud Stream application with SASL and Kerberos using a JAAS configuration file:
|
||||
|
||||
[source]
|
||||
----
|
||||
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \\
|
||||
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \\
|
||||
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \\
|
||||
--spring.cloud.stream.bindings.input.destination=stream.ticktock \\
|
||||
--spring.cloud.stream.kafka.binder.clientConfiguration.security.protocol=SASL_PLAINTEXT
|
||||
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
|
||||
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
|
||||
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
|
||||
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
|
||||
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
|
||||
----
|
||||
|
||||
===== Using Spring Boot properties
|
||||
|
||||
As an alternative to having a JAAS configuration file, Spring Cloud Stream provides a mechanism for setting up the JAAS configuration for Spring Cloud Stream applications using Spring Boot properties.
|
||||
|
||||
The following properties can be used for configuring the login context of the Kafka client.
|
||||
|
||||
spring.cloud.stream.kafka.binder.jaas.loginModule::
|
||||
The login module name. Not necessary to be set in normal cases.
|
||||
+
|
||||
Default: `com.sun.security.auth.module.Krb5LoginModule`.
|
||||
spring.cloud.stream.kafka.binder.jaas.controlFlag::
|
||||
The control flag of the login module.
|
||||
+
|
||||
Default: `required`.
|
||||
spring.cloud.stream.kafka.binder.jaas.options::
|
||||
Map with a key/value pair containing the login module options.
|
||||
+
|
||||
Default: Empty map.
|
||||
|
||||
Here is an example of launching a Spring Cloud Stream application with SASL and Kerberos using Spring Boot configuration properties:
|
||||
|
||||
[source]
|
||||
----
|
||||
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
|
||||
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
|
||||
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
|
||||
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
|
||||
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
|
||||
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
|
||||
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
|
||||
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
|
||||
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
|
||||
----
|
||||
|
||||
This represents the equivalent of the following JAAS file:
|
||||
|
||||
[source]
|
||||
----
|
||||
KafkaClient {
|
||||
com.sun.security.auth.module.Krb5LoginModule required
|
||||
useKeyTab=true
|
||||
storeKey=true
|
||||
keyTab="/etc/security/keytabs/kafka_client.keytab"
|
||||
principal="kafka-client-1@EXAMPLE.COM";
|
||||
};
|
||||
----
|
||||
|
||||
If the topics required already exist on the broker, or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent. As an alternative to setting `spring.cloud.stream.kafka.binder.autoCreateTopics` you can simply remove the broker dependency from the application. See <<exclude-admin-utils>> for details.
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
Do not mix JAAS configuration files and Spring Boot properties in the same application.
|
||||
If the `-Djava.security.auth.login.config` system property is already present, Spring Cloud Stream will ignore the Spring Boot properties.
|
||||
|
||||
====
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
@@ -246,12 +337,12 @@ Then add these dependencies at the top of the `<dependencies>` section in the po
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>1.1.0.M1</version>
|
||||
<version>1.1.1.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>2.0.1.RELEASE</version>
|
||||
<version>2.1.0.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
@@ -266,10 +357,39 @@ Then add these dependencies at the top of the `<dependencies>` section in the po
|
||||
</dependency>
|
||||
----
|
||||
|
||||
====
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
The versions above are provided only for the sake of the example.
|
||||
For best results, we recommend using the most recent 0.10-compatible versions of the projects.
|
||||
====
|
||||
|
||||
[[exclude-admin-utils]]
|
||||
==== Excluding Kafka broker jar from the classpath of the binder based application
|
||||
|
||||
The Apache Kafka Binder uses the administrative utilities which are part of the Apache Kafka server library to create and reconfigure topics.
|
||||
If the inclusion of the Apache Kafka server library and its dependencies is not necessary at runtime because the application will rely on the topics being configured administratively, the Kafka binder allows for Apache Kafka server dependency to be excluded from the application.
|
||||
|
||||
If you use Kafka 10 dependencies as advised above, all you have to do is not to include the kafka broker dependency.
|
||||
If you use Kafka 0.9, then ensure that you exclude the kafka broker jar from the `spring-cloud-starter-stream-kafka` dependency as following.
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
If you exclude the Apache Kafka server dependency and the topic is not present on the server, then the Apache Kafka broker will create the topic if auto topic creation is enabled on the server.
|
||||
Please keep in mind that if you are relying on this, then the Kafka server will use the default number of partitions and replication factors.
|
||||
On the other hand, if auto topic creation is disabled on the server, then care must be taken before running the application to create the topic with the desired number of partitions.
|
||||
|
||||
If you want to have full control over how partitions are allocated, then leave the default settings as they are, i.e. do not exclude the kafka broker jar and ensure that `spring.cloud.stream.kafka.binder.autoCreateTopics` is set to `true`, which is the default.
|
||||
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.RC1</version>
|
||||
<version>1.1.1.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
@@ -66,6 +66,11 @@
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>${spring-integration-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
|
||||
@@ -0,0 +1,118 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
import javax.security.auth.login.Configuration;
|
||||
|
||||
import org.apache.kafka.common.security.JaasUtils;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.event.ContextRefreshedEvent;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
public class KafkaBinderJaasInitializerListener implements ApplicationListener<ContextRefreshedEvent>,
|
||||
ApplicationContextAware, DisposableBean {
|
||||
|
||||
public static final String DEFAULT_ZK_LOGIN_CONTEXT_NAME = "Client";
|
||||
|
||||
private ApplicationContext applicationContext;
|
||||
|
||||
private final boolean ignoreJavaLoginConfigParamSystemProperty;
|
||||
|
||||
private final File placeholderJaasConfiguration;
|
||||
|
||||
public KafkaBinderJaasInitializerListener() throws IOException {
|
||||
// we ignore the system property if it wasn't originally set at launch
|
||||
this.ignoreJavaLoginConfigParamSystemProperty =
|
||||
(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) == null);
|
||||
this.placeholderJaasConfiguration = File.createTempFile("kafka-client-jaas-config-placeholder", "conf");
|
||||
this.placeholderJaasConfiguration.deleteOnExit();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
this.applicationContext = applicationContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
if (this.ignoreJavaLoginConfigParamSystemProperty) {
|
||||
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(ContextRefreshedEvent event) {
|
||||
if (event.getSource() == this.applicationContext) {
|
||||
KafkaBinderConfigurationProperties binderConfigurationProperties =
|
||||
applicationContext.getBean(KafkaBinderConfigurationProperties.class);
|
||||
// only use programmatic support if a file is not set via system property
|
||||
if (ignoreJavaLoginConfigParamSystemProperty
|
||||
&& binderConfigurationProperties.getJaas() != null) {
|
||||
Map<String, AppConfigurationEntry[]> configurationEntries = new HashMap<>();
|
||||
AppConfigurationEntry kafkaClientConfigurationEntry = new AppConfigurationEntry
|
||||
(binderConfigurationProperties.getJaas().getLoginModule(),
|
||||
binderConfigurationProperties.getJaas().getControlFlagValue(),
|
||||
binderConfigurationProperties.getJaas().getOptions() != null ?
|
||||
binderConfigurationProperties.getJaas().getOptions() :
|
||||
Collections.<String, Object>emptyMap());
|
||||
configurationEntries.put(JaasUtils.LOGIN_CONTEXT_CLIENT,
|
||||
new AppConfigurationEntry[]{ kafkaClientConfigurationEntry });
|
||||
Configuration.setConfiguration(new InternalConfiguration(configurationEntries));
|
||||
// Workaround for a 0.9 client issue where even if the Configuration is set
|
||||
// a system property check is performed.
|
||||
// Since the Configuration already exists, this will be ignored.
|
||||
if (this.placeholderJaasConfiguration != null) {
|
||||
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, this.placeholderJaasConfiguration.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@link Configuration} set up programmatically by the Kafka binder
|
||||
*/
|
||||
public static class InternalConfiguration extends Configuration {
|
||||
|
||||
private final Map<String, AppConfigurationEntry[]> configurationEntries;
|
||||
|
||||
public InternalConfiguration(Map<String, AppConfigurationEntry[]> configurationEntries) {
|
||||
Assert.notNull(configurationEntries, " cannot be null");
|
||||
Assert.notEmpty(configurationEntries, " cannot be empty");
|
||||
this.configurationEntries = configurationEntries;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
|
||||
return configurationEntries.get(name);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -38,7 +38,6 @@ import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.security.JaasUtils;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
@@ -61,6 +60,7 @@ import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ErrorHandler;
|
||||
import org.springframework.kafka.listener.config.ContainerProperties;
|
||||
@@ -198,8 +198,8 @@ public class KafkaMessageChannelBinder extends
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
|
||||
|
||||
KafkaTopicUtils.validateTopicName(destination);
|
||||
|
||||
Collection<PartitionInfo> partitions = ensureTopicCreated(destination, producerProperties.getPartitionCount());
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(destination, producerProperties.getPartitionCount());
|
||||
Collection<PartitionInfo> partitions = getPartitionsForTopic(destination, producerProperties.getPartitionCount());
|
||||
|
||||
if (producerProperties.getPartitionCount() < partitions.size()) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
@@ -221,13 +221,13 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
@Override
|
||||
protected String createProducerDestinationIfNecessary(String name,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties) {
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("Using kafka topic for outbound: " + name);
|
||||
}
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
|
||||
Collection<PartitionInfo> partitions = ensureTopicCreated(name, properties.getPartitionCount());
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, properties.getPartitionCount());
|
||||
Collection<PartitionInfo> partitions = getPartitionsForTopic(name, properties.getPartitionCount());
|
||||
if (properties.getPartitionCount() < partitions.size()) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("The `partitionCount` of the producer for topic " + name + " is "
|
||||
@@ -247,8 +247,7 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
|
||||
props.put(ProducerConfig.RETRIES_CONFIG, 0);
|
||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
|
||||
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
|
||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
|
||||
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
@@ -257,7 +256,6 @@ public class KafkaMessageChannelBinder extends
|
||||
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
|
||||
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
|
||||
producerProperties.getExtension().getCompressionType().toString());
|
||||
|
||||
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
|
||||
props.putAll(producerProperties.getExtension().getConfiguration());
|
||||
}
|
||||
@@ -271,9 +269,9 @@ public class KafkaMessageChannelBinder extends
|
||||
if (properties.getInstanceCount() == 0) {
|
||||
throw new IllegalArgumentException("Instance count cannot be zero");
|
||||
}
|
||||
|
||||
Collection<PartitionInfo> allPartitions = ensureTopicCreated(name,
|
||||
properties.getInstanceCount() * properties.getConcurrency());
|
||||
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, partitionCount);
|
||||
Collection<PartitionInfo> allPartitions = getPartitionsForTopic(name, partitionCount);
|
||||
|
||||
Collection<PartitionInfo> listenedPartitions;
|
||||
|
||||
@@ -297,34 +295,25 @@ public class KafkaMessageChannelBinder extends
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected MessageProducer createConsumerEndpoint(String name, String group, Collection<PartitionInfo> destination,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
boolean anonymous = !StringUtils.hasText(group);
|
||||
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
|
||||
"DLQ support is not available for anonymous subscriptions");
|
||||
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
|
||||
|
||||
Map<String, Object> props = getConsumerConfig(anonymous, consumerGroup);
|
||||
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
|
||||
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
|
||||
|
||||
if (!ObjectUtils.isEmpty(properties.getExtension().getConfiguration())) {
|
||||
props.putAll(properties.getExtension().getConfiguration());
|
||||
}
|
||||
|
||||
ConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(props, keyDecoder,
|
||||
valueDecoder);
|
||||
|
||||
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
|
||||
Collection<PartitionInfo> listenedPartitions = destination;
|
||||
Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
|
||||
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
|
||||
listenedPartitions);
|
||||
|
||||
final ContainerProperties containerProperties =
|
||||
anonymous || properties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(name)
|
||||
: new ContainerProperties(topicPartitionInitialOffsets);
|
||||
|
||||
int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
|
||||
final ConcurrentMessageListenerContainer<byte[], byte[]> messageListenerContainer =
|
||||
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
|
||||
new ConcurrentMessageListenerContainer(
|
||||
consumerFactory, containerProperties) {
|
||||
|
||||
@@ -335,25 +324,23 @@ public class KafkaMessageChannelBinder extends
|
||||
};
|
||||
messageListenerContainer.setConcurrency(concurrency);
|
||||
messageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(properties));
|
||||
|
||||
if (!properties.getExtension().isAutoCommitOffset()) {
|
||||
messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
|
||||
}
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug(
|
||||
"Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
|
||||
}
|
||||
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug(
|
||||
"Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
|
||||
}
|
||||
|
||||
final KafkaMessageDrivenChannelAdapter<byte[], byte[]> kafkaMessageDrivenChannelAdapter =
|
||||
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter =
|
||||
new KafkaMessageDrivenChannelAdapter<>(
|
||||
messageListenerContainer);
|
||||
|
||||
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
|
||||
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
|
||||
kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);
|
||||
|
||||
if (properties.getExtension().isEnableDlq()) {
|
||||
final String dlqTopic = "error." + name + "." + group;
|
||||
initDlqProducer();
|
||||
@@ -396,6 +383,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
private Map<String, Object> getConsumerConfig(boolean anonymous, String consumerGroup) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
|
||||
props.putAll(configurationProperties.getConfiguration());
|
||||
}
|
||||
@@ -427,11 +416,24 @@ public class KafkaMessageChannelBinder extends
|
||||
return topicPartitionInitialOffsets;
|
||||
}
|
||||
|
||||
private void createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(final String topicName, final int partitionCount) {
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
|
||||
createTopicAndPartitions(topicName, partitionCount);
|
||||
}
|
||||
else if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation == null) {
|
||||
this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. " +
|
||||
"No topic will be created by the binder");
|
||||
}
|
||||
else if (!this.configurationProperties.isAutoCreateTopics()) {
|
||||
this.logger.info("Auto creation of topics is disabled.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Kafka topic if needed, or try to increase its partition count to the
|
||||
* desired number.
|
||||
*/
|
||||
private Collection<PartitionInfo> ensureTopicCreated(final String topicName, final int partitionCount) {
|
||||
private void createTopicAndPartitions(final String topicName, final int partitionCount) {
|
||||
|
||||
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
|
||||
this.configurationProperties.getZkSessionTimeout(),
|
||||
@@ -459,62 +461,59 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
}
|
||||
else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
|
||||
if (this.configurationProperties.isAutoCreateTopics()) {
|
||||
// always consider minPartitionCount for topic creation
|
||||
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
|
||||
partitionCount);
|
||||
// always consider minPartitionCount for topic creation
|
||||
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
|
||||
partitionCount);
|
||||
|
||||
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
|
||||
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
|
||||
|
||||
@Override
|
||||
public Object doWithRetry(RetryContext context) throws RuntimeException {
|
||||
@Override
|
||||
public Object doWithRetry(RetryContext context) throws RuntimeException {
|
||||
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
|
||||
configurationProperties.getReplicationFactor(), new Properties());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
else {
|
||||
throw new BinderException("Topic " + topicName + " does not exist");
|
||||
}
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
|
||||
configurationProperties.getReplicationFactor(), new Properties());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
else {
|
||||
throw new BinderException("Error fetching Kafka topic metadata: ",
|
||||
ErrorMapping.exceptionFor(errorCode));
|
||||
}
|
||||
try {
|
||||
return this.metadataRetryOperations
|
||||
.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() {
|
||||
|
||||
@Override
|
||||
public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
|
||||
Collection<PartitionInfo> partitions =
|
||||
getProducerFactory(
|
||||
new ExtendedProducerProperties<>(new KafkaProducerProperties()))
|
||||
.createProducer().partitionsFor(topicName);
|
||||
|
||||
// do a sanity check on the partition set
|
||||
if (partitions.size() < partitionCount) {
|
||||
throw new IllegalStateException("The number of expected partitions was: "
|
||||
+ partitionCount + ", but " + partitions.size()
|
||||
+ (partitions.size() > 1 ? " have " : " has ") + "been found instead");
|
||||
}
|
||||
return partitions;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e) {
|
||||
this.logger.error("Cannot initialize Binder", e);
|
||||
throw new BinderException("Cannot initialize binder:", e);
|
||||
}
|
||||
|
||||
}
|
||||
finally {
|
||||
zkUtils.close();
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<PartitionInfo> getPartitionsForTopic(final String topicName, final int partitionCount) {
|
||||
try {
|
||||
return this.metadataRetryOperations
|
||||
.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() {
|
||||
|
||||
@Override
|
||||
public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
|
||||
Collection<PartitionInfo> partitions =
|
||||
getProducerFactory(
|
||||
new ExtendedProducerProperties<>(new KafkaProducerProperties()))
|
||||
.createProducer().partitionsFor(topicName);
|
||||
|
||||
// do a sanity check on the partition set
|
||||
if (partitions.size() < partitionCount) {
|
||||
throw new IllegalStateException("The number of expected partitions was: "
|
||||
+ partitionCount + ", but " + partitions.size()
|
||||
+ (partitions.size() > 1 ? " have " : " has ") + "been found instead");
|
||||
}
|
||||
return partitions;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e) {
|
||||
this.logger.error("Cannot initialize Binder", e);
|
||||
throw new BinderException("Cannot initialize binder:", e);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void initDlqProducer() {
|
||||
try {
|
||||
if (this.dlqProducer == null) {
|
||||
|
||||
@@ -24,6 +24,7 @@ import java.util.Properties;
|
||||
import kafka.api.PartitionMetadata;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
@@ -31,13 +32,11 @@ import org.springframework.util.ReflectionUtils;
|
||||
*/
|
||||
public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
|
||||
private static ClassLoader CLASS_LOADER = Kafka10AdminUtilsOperation.class.getClassLoader();
|
||||
|
||||
private static Class<?> ADMIN_UTIL_CLASS;
|
||||
|
||||
static {
|
||||
try {
|
||||
ADMIN_UTIL_CLASS = CLASS_LOADER.loadClass("kafka.admin.AdminUtils");
|
||||
ADMIN_UTIL_CLASS = ClassUtils.forName("kafka.admin.AdminUtils", null);
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils class not found", e);
|
||||
@@ -77,7 +76,7 @@ public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
|
||||
|
||||
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
|
||||
Class<?> topicMetadataClass = CLASS_LOADER.loadClass("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata");
|
||||
Class<?> topicMetadataClass = ClassUtils.forName("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata", null);
|
||||
|
||||
Method errorCodeMethod = ReflectionUtils.findMethod(topicMetadataClass, "error");
|
||||
Object obj = errorCodeMethod.invoke(result);
|
||||
@@ -103,7 +102,7 @@ public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
try {
|
||||
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
|
||||
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
|
||||
Class<?> topicMetadataClass = CLASS_LOADER.loadClass("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata");
|
||||
Class<?> topicMetadataClass = ClassUtils.forName("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata", null);
|
||||
|
||||
Method partitionsMetadata = ReflectionUtils.findMethod(topicMetadataClass, "partitionMetadata");
|
||||
List<PartitionMetadata> foo = (List<PartitionMetadata>) partitionsMetadata.invoke(result);
|
||||
|
||||
@@ -0,0 +1,82 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.config;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Contains properties for setting up an {@link AppConfigurationEntry} that can be used
|
||||
* for the Kafka or Zookeeper client.
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
public class JaasLoginModuleConfiguration {
|
||||
|
||||
private String loginModule = "com.sun.security.auth.module.Krb5LoginModule";
|
||||
|
||||
private AppConfigurationEntry.LoginModuleControlFlag controlFlag = AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
|
||||
|
||||
private Map<String,String> options = new HashMap<>();
|
||||
|
||||
public String getLoginModule() {
|
||||
return loginModule;
|
||||
}
|
||||
|
||||
public void setLoginModule(String loginModule) {
|
||||
Assert.notNull(loginModule, "cannot be null");
|
||||
this.loginModule = loginModule;
|
||||
}
|
||||
|
||||
public String getControlFlag() {
|
||||
return controlFlag.toString();
|
||||
}
|
||||
|
||||
public AppConfigurationEntry.LoginModuleControlFlag getControlFlagValue() {
|
||||
return controlFlag;
|
||||
}
|
||||
|
||||
public void setControlFlag(String controlFlag) {
|
||||
Assert.notNull(controlFlag, "cannot be null");
|
||||
if (AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL.equals(controlFlag)) {
|
||||
this.controlFlag = AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL;
|
||||
}
|
||||
else if (AppConfigurationEntry.LoginModuleControlFlag.REQUIRED.equals(controlFlag)) {
|
||||
this.controlFlag = AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
|
||||
}
|
||||
else if (AppConfigurationEntry.LoginModuleControlFlag.REQUISITE.equals(controlFlag)) {
|
||||
this.controlFlag = AppConfigurationEntry.LoginModuleControlFlag.REQUISITE;
|
||||
}
|
||||
else if (AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT.equals(controlFlag)) {
|
||||
this.controlFlag = AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT;
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException(controlFlag + " is not a supported control flag");
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, String> getOptions() {
|
||||
return options;
|
||||
}
|
||||
|
||||
public void setOptions(Map<String, String> options) {
|
||||
this.options = options;
|
||||
}
|
||||
}
|
||||
@@ -16,17 +16,20 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.config;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.common.utils.AppInfoParser;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.PropertyPlaceholderAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderJaasInitializerListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
@@ -34,6 +37,7 @@ import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOper
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.config.codec.kryo.KryoCodecAutoConfiguration;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Condition;
|
||||
import org.springframework.context.annotation.ConditionContext;
|
||||
@@ -58,7 +62,7 @@ import org.springframework.kafka.support.ProducerListener;
|
||||
@EnableConfigurationProperties({KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class})
|
||||
public class KafkaBinderConfiguration {
|
||||
|
||||
protected final Log logger = LogFactory.getLog(getClass());
|
||||
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
|
||||
|
||||
@Autowired
|
||||
private Codec codec;
|
||||
@@ -75,14 +79,16 @@ public class KafkaBinderConfiguration {
|
||||
@Autowired
|
||||
private ApplicationContext context;
|
||||
|
||||
@Autowired (required = false)
|
||||
private AdminUtilsOperation adminUtilsOperation;
|
||||
|
||||
@Bean
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder() {
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
|
||||
this.configurationProperties);
|
||||
kafkaMessageChannelBinder.setCodec(this.codec);
|
||||
//kafkaMessageChannelBinder.setProducerListener(producerListener);
|
||||
kafkaMessageChannelBinder.setProducerListener(producerListener);
|
||||
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
|
||||
AdminUtilsOperation adminUtilsOperation = context.getBean(AdminUtilsOperation.class);
|
||||
kafkaMessageChannelBinder.setAdminUtilsOperation(adminUtilsOperation);
|
||||
return kafkaMessageChannelBinder;
|
||||
}
|
||||
@@ -99,67 +105,46 @@ public class KafkaBinderConfiguration {
|
||||
}
|
||||
|
||||
@Bean(name = "adminUtilsOperation")
|
||||
@Conditional(Kafka09Condition.class)
|
||||
@Conditional(Kafka09Present.class)
|
||||
@ConditionalOnClass(name = "kafka.admin.AdminUtils")
|
||||
public AdminUtilsOperation kafka09AdminUtilsOperation() {
|
||||
logger.info("AdminUtils selected: Kafka 0.9 AdminUtils");
|
||||
return new Kafka09AdminUtilsOperation();
|
||||
}
|
||||
|
||||
@Bean(name = "adminUtilsOperation")
|
||||
@Conditional(Kafka10Condition.class)
|
||||
@Conditional(Kafka10Present.class)
|
||||
@ConditionalOnClass(name = "kafka.admin.AdminUtils")
|
||||
public AdminUtilsOperation kafka10AdminUtilsOperation() {
|
||||
logger.info("AdminUtils selected: Kafka 0.10 AdminUtils");
|
||||
return new Kafka10AdminUtilsOperation();
|
||||
}
|
||||
|
||||
private static Method getMethod(ClassLoader classLoader, String methodName) {
|
||||
try {
|
||||
Class<?> adminUtilClass = classLoader.loadClass("kafka.admin.AdminUtils");
|
||||
Method[] declaredMethods = adminUtilClass.getDeclaredMethods();
|
||||
for (Method m : declaredMethods) {
|
||||
if (m.getName().equals(methodName)) {
|
||||
return m;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils not found", e);
|
||||
}
|
||||
return null;
|
||||
@Bean
|
||||
public ApplicationListener<?> jaasInitializer() throws IOException {
|
||||
return new KafkaBinderJaasInitializerListener();
|
||||
}
|
||||
|
||||
static class Kafka10Condition implements Condition {
|
||||
static class Kafka10Present implements Condition {
|
||||
|
||||
@Override
|
||||
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
|
||||
ClassLoader classLoader = Kafka10Condition.class.getClassLoader();
|
||||
Method addPartitions = getMethod(classLoader, "addPartitions");
|
||||
if (addPartitions != null) {
|
||||
Class<?>[] parameterTypes = addPartitions.getParameterTypes();
|
||||
Class clazz = parameterTypes[parameterTypes.length - 1];
|
||||
if (clazz.getName().equals("kafka.admin.RackAwareMode")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return AppInfoParser.getVersion().startsWith("0.10");
|
||||
}
|
||||
}
|
||||
|
||||
static class Kafka09Condition implements Condition {
|
||||
|
||||
static class Kafka09Present implements Condition {
|
||||
|
||||
@Override
|
||||
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
|
||||
|
||||
ClassLoader classLoader = Kafka09Condition.class.getClassLoader();
|
||||
Method addPartitions = getMethod(classLoader, "addPartitions");
|
||||
if (addPartitions != null) {
|
||||
Class<?>[] parameterTypes = addPartitions.getParameterTypes();
|
||||
Class clazz = parameterTypes[parameterTypes.length - 1];
|
||||
if (!clazz.getName().equals("kafka.admin.RackAwareMode")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return AppInfoParser.getVersion().startsWith("0.9");
|
||||
}
|
||||
}
|
||||
|
||||
public static class JaasConfigurationProperties {
|
||||
|
||||
private JaasLoginModuleConfiguration kafka;
|
||||
|
||||
private JaasLoginModuleConfiguration zookeeper;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,6 +77,8 @@ public class KafkaBinderConfigurationProperties {
|
||||
|
||||
private int queueSize = 8192;
|
||||
|
||||
private JaasLoginModuleConfiguration jaas;
|
||||
|
||||
public String getZkConnectionString() {
|
||||
return toConnectionString(this.zkNodes, this.defaultZkPort);
|
||||
}
|
||||
@@ -254,4 +256,13 @@ public class KafkaBinderConfigurationProperties {
|
||||
public void setConfiguration(Map<String, String> configuration) {
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
public JaasLoginModuleConfiguration getJaas() {
|
||||
return jaas;
|
||||
}
|
||||
|
||||
public void setJaas(JaasLoginModuleConfiguration jaas) {
|
||||
this.jaas = jaas;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,20 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* This package contains an implementation of the {@link org.springframework.cloud.stream.binder.Binder} for Kafka.
|
||||
*/
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
@@ -1,3 +1,18 @@
|
||||
/*
|
||||
* Copyright 2014-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -32,8 +32,6 @@ import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.Spy;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
@@ -119,16 +117,6 @@ public class Kafka09BinderTests extends KafkaBinderTests {
|
||||
return adminUtilsOperation.partitionSize(topic, zkUtils);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
|
||||
return new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
|
||||
return new ExtendedProducerProperties<>(new KafkaProducerProperties());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKafkaOffsetHeaderKey() {
|
||||
return KafkaHeaders.OFFSET;
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@SpringBootTest(classes = KafkaBinderConfiguration.class)
|
||||
public class KafkaBinderConfigurationTest {
|
||||
|
||||
@Autowired
|
||||
private KafkaMessageChannelBinder kafkaMessageChannelBinder;
|
||||
|
||||
@Test
|
||||
public void testKafkaBinderProducerListener() {
|
||||
assertNotNull(this.kafkaMessageChannelBinder);
|
||||
Field producerListenerField = ReflectionUtils.findField(
|
||||
KafkaMessageChannelBinder.class, "producerListener",
|
||||
ProducerListener.class);
|
||||
ReflectionUtils.makeAccessible(producerListenerField);
|
||||
ProducerListener producerListener = (ProducerListener) ReflectionUtils.getField(
|
||||
producerListenerField, this.kafkaMessageChannelBinder);
|
||||
assertNotNull(producerListener);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
|
||||
import com.sun.security.auth.login.ConfigFile;
|
||||
import org.apache.kafka.common.security.JaasUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
public class KafkaBinderJaasInitializerListenerTest {
|
||||
|
||||
@Test
|
||||
public void testConfigurationParsedCorrectlyWithKafkaClient() throws Exception {
|
||||
ConfigFile configFile = new ConfigFile(new ClassPathResource("jaas-sample-kafka-only.conf").getURI());
|
||||
final AppConfigurationEntry[] kafkaConfigurationArray = configFile.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_CLIENT);
|
||||
|
||||
final ConfigurableApplicationContext context =
|
||||
SpringApplication.run(SimpleApplication.class,
|
||||
"--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true",
|
||||
"--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true",
|
||||
"--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab",
|
||||
"--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM");
|
||||
javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration.getConfiguration();
|
||||
|
||||
final AppConfigurationEntry[] kafkaConfiguration = configuration.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_CLIENT);
|
||||
assertThat(kafkaConfiguration).hasSize(1);
|
||||
assertThat(kafkaConfiguration[0].getOptions()).isEqualTo(kafkaConfigurationArray[0].getOptions());
|
||||
context.close();
|
||||
}
|
||||
|
||||
@SpringBootApplication
|
||||
public static class SimpleApplication {
|
||||
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
@@ -27,6 +28,10 @@ import java.util.concurrent.TimeUnit;
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.assertj.core.api.Condition;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
@@ -35,6 +40,7 @@ import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.BinderException;
|
||||
import org.springframework.cloud.stream.binder.Binding;
|
||||
import org.springframework.cloud.stream.binder.DefaultBinding;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
|
||||
@@ -46,12 +52,18 @@ import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.integration.IntegrationMessageHeaderAccessor;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
|
||||
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.TopicPartitionInitialOffset;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
@@ -60,17 +72,32 @@ import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public abstract class KafkaBinderTests extends PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>> {
|
||||
|
||||
protected abstract ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties();
|
||||
@Override
|
||||
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
|
||||
final ExtendedConsumerProperties<KafkaConsumerProperties> kafkaConsumerProperties =
|
||||
new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
|
||||
// set the default values that would normally be propagated by Spring Cloud Stream
|
||||
kafkaConsumerProperties.setInstanceCount(1);
|
||||
kafkaConsumerProperties.setInstanceIndex(0);
|
||||
return kafkaConsumerProperties;
|
||||
}
|
||||
|
||||
protected abstract ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties();
|
||||
@Override
|
||||
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(new KafkaProducerProperties());
|
||||
producerProperties.getExtension().setSync(true);
|
||||
return producerProperties;
|
||||
}
|
||||
|
||||
public abstract String getKafkaOffsetHeaderKey();
|
||||
|
||||
@@ -288,7 +315,6 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.0", "test", moduleInputChannel,
|
||||
consumerProperties);
|
||||
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload(testPayload)
|
||||
@@ -551,7 +577,6 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
String testPayload1 = "foo1-" + UUID.randomUUID().toString();
|
||||
output.send(new GenericMessage<>(testPayload1.getBytes()));
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> firstConsumerProperties = createConsumerProperties();
|
||||
firstConsumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "startOffsets", input1,
|
||||
firstConsumerProperties);
|
||||
Message<byte[]> receivedMessage1 = (Message<byte[]>) receive(input1);
|
||||
@@ -563,17 +588,14 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
assertThat(new String(receivedMessage2.getPayload())).isNotNull();
|
||||
consumerBinding.unbind();
|
||||
|
||||
Thread.sleep(2000);
|
||||
String testPayload3 = "foo3-" + UUID.randomUUID().toString();
|
||||
output.send(new GenericMessage<>(testPayload3.getBytes()));
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "startOffsets", input1, consumerProperties);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
Message<byte[]> receivedMessage3 = (Message<byte[]>) receive(input1);
|
||||
assertThat(receivedMessage3).isNotNull();
|
||||
assertThat(new String(receivedMessage3.getPayload())).isEqualTo(testPayload3);
|
||||
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
finally {
|
||||
if (consumerBinding != null) {
|
||||
@@ -639,6 +661,81 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
consumerBinding2.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testManualAckSucceedsWhenAutoCommitOffsetIsTurnedOff() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
createProducerBindingProperties(createProducerProperties()));
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.x", moduleOutputChannel,
|
||||
createProducerProperties());
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoCommitOffset(false);
|
||||
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.x", "test", moduleInputChannel,
|
||||
consumerProperties);
|
||||
|
||||
String testPayload1 = "foo" + UUID.randomUUID().toString();
|
||||
Message<?> message1 = org.springframework.integration.support.MessageBuilder.withPayload(
|
||||
testPayload1.getBytes()).build();
|
||||
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message1);
|
||||
|
||||
Message<?> receivedMessage = receive(moduleInputChannel);
|
||||
assertThat(receivedMessage).isNotNull();
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT)).isNotNull();
|
||||
Acknowledgment acknowledgment = receivedMessage.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
|
||||
try {
|
||||
acknowledgment.acknowledge();
|
||||
}
|
||||
catch (Exception e) {
|
||||
fail("Acknowledge must not throw an exception");
|
||||
}
|
||||
finally {
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
createProducerBindingProperties(createProducerProperties()));
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.x", moduleOutputChannel,
|
||||
createProducerProperties());
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.x", "test", moduleInputChannel,
|
||||
consumerProperties);
|
||||
|
||||
String testPayload1 = "foo" + UUID.randomUUID().toString();
|
||||
Message<?> message1 = org.springframework.integration.support.MessageBuilder.withPayload(
|
||||
testPayload1.getBytes()).build();
|
||||
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message1);
|
||||
|
||||
Message<?> receivedMessage = receive(moduleInputChannel);
|
||||
assertThat(receivedMessage).isNotNull();
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT)).isNull();
|
||||
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -907,7 +1004,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoCreateTopicsDisabledFailsIfTopicMissing() throws Exception {
|
||||
public void testAutoCreateTopicsDisabledOnBinderStillWorksAsLongAsBrokerCreatesTopic() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
configurationProperties.setAutoCreateTopics(false);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
@@ -919,15 +1016,22 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
String testTopicName = "nonexisting" + System.currentTimeMillis();
|
||||
try {
|
||||
binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
fail();
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e).isInstanceOf(BinderException.class);
|
||||
assertThat(e).hasMessageContaining("Topic " + testTopicName + " does not exist");
|
||||
}
|
||||
String testTopicName = "createdByBroker-" + System.currentTimeMillis();
|
||||
|
||||
QueueChannel input = new QueueChannel();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, output,
|
||||
createProducerProperties());
|
||||
|
||||
String testPayload = "foo1-" + UUID.randomUUID().toString();
|
||||
output.send(new GenericMessage<>(testPayload.getBytes()));
|
||||
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(testTopicName, "test", input, consumerProperties);
|
||||
Message<byte[]> receivedMessage2 = (Message<byte[]>) receive(input);
|
||||
assertThat(receivedMessage2).isNotNull();
|
||||
assertThat(new String(receivedMessage2.getPayload())).isEqualTo(testPayload);
|
||||
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -1046,7 +1150,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
binding = binder.bindConsumer(testTopicName, "test-x", output, consumerProperties);
|
||||
|
||||
TopicPartitionInitialOffset[] listenedPartitions = TestUtils.getPropertyValue(binding,
|
||||
"endpoint.messageListenerContainer.containerProperties.topicPartitions",
|
||||
"lifecycle.messageListenerContainer.containerProperties.topicPartitions",
|
||||
TopicPartitionInitialOffset[].class);
|
||||
assertThat(listenedPartitions).hasSize(2);
|
||||
assertThat(listenedPartitions).contains(new TopicPartitionInitialOffset(testTopicName, 2),
|
||||
@@ -1093,6 +1197,159 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
assertThat(partitionSize(testTopicName)).isEqualTo(6);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testConsumerDefaultDeserializer() throws Exception {
|
||||
Binding<?> binding = null;
|
||||
try {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
final ZkUtils zkUtils = getZkUtils(configurationProperties);
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
invokeCreateTopic(zkUtils, testTopicName, 5, 1, new Properties());
|
||||
configurationProperties.setAutoCreateTopics(false);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding));
|
||||
assertTrue(consumerAccessor.getPropertyValue("keyDeserializer") instanceof ByteArrayDeserializer);
|
||||
assertTrue(consumerAccessor.getPropertyValue("valueDeserializer") instanceof ByteArrayDeserializer);
|
||||
}
|
||||
finally {
|
||||
if (binding != null) {
|
||||
binding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testConsumerCustomDeserializer() throws Exception {
|
||||
Binding<?> binding = null;
|
||||
try {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
Map<String, String> propertiesToOverride = configurationProperties.getConfiguration();
|
||||
propertiesToOverride.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
propertiesToOverride.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
|
||||
configurationProperties.setConfiguration(propertiesToOverride);
|
||||
final ZkUtils zkUtils = getZkUtils(configurationProperties);
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
invokeCreateTopic(zkUtils, testTopicName, 5, 1, new Properties());
|
||||
configurationProperties.setAutoCreateTopics(false);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding));
|
||||
assertTrue("Expected StringDeserializer as a custom key deserializer", consumerAccessor.getPropertyValue("keyDeserializer") instanceof StringDeserializer);
|
||||
assertTrue("Expected LongDeserializer as a custom value deserializer", consumerAccessor.getPropertyValue("valueDeserializer") instanceof LongDeserializer);
|
||||
}
|
||||
finally {
|
||||
if (binding != null) {
|
||||
binding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private KafkaConsumer getKafkaConsumer(Binding binding) {
|
||||
DirectFieldAccessor bindingAccessor = new DirectFieldAccessor((DefaultBinding)binding);
|
||||
KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) bindingAccessor.getPropertyValue("lifecycle");
|
||||
DirectFieldAccessor adapterAccessor = new DirectFieldAccessor(adapter);
|
||||
ConcurrentMessageListenerContainer messageListenerContainer = (ConcurrentMessageListenerContainer) adapterAccessor.getPropertyValue("messageListenerContainer");
|
||||
DirectFieldAccessor containerAccessor = new DirectFieldAccessor((ConcurrentMessageListenerContainer)messageListenerContainer);
|
||||
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) containerAccessor.getPropertyValue("consumerFactory");
|
||||
return (KafkaConsumer) consumerFactory.createConsumer();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testNativeSerializationWithCustomSerializerDeserializer() throws Exception {
|
||||
Binding<?> producerBinding = null;
|
||||
Binding<?> consumerBinding = null;
|
||||
try {
|
||||
Integer testPayload = new Integer(10);
|
||||
Message<?> message = MessageBuilder.withPayload(testPayload).build();
|
||||
SubscribableChannel moduleOutputChannel = new DirectChannel();
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
final ZkClient zkClient;
|
||||
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.setUseNativeEncoding(true);
|
||||
producerProperties.getExtension().getConfiguration().put("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
|
||||
producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
consumerProperties.getExtension().getConfiguration().put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel, 500);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(inbound.getPayload()).isEqualTo(10);
|
||||
assertThat(inbound.getHeaders()).doesNotContainKey("contentType");
|
||||
}
|
||||
finally {
|
||||
if (producerBinding != null) {
|
||||
producerBinding.unbind();
|
||||
}
|
||||
if (consumerBinding != null) {
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testBuiltinSerialization() throws Exception {
|
||||
Binding<?> producerBinding = null;
|
||||
Binding<?> consumerBinding = null;
|
||||
try {
|
||||
String testPayload = new String("test");
|
||||
Message<?> message = MessageBuilder.withPayload(testPayload).build();
|
||||
SubscribableChannel moduleOutputChannel = new DirectChannel();
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
final ZkClient zkClient;
|
||||
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel, 5);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(inbound.getPayload()).isEqualTo("test");
|
||||
assertThat(inbound.getHeaders()).containsEntry("contentType", "text/plain");
|
||||
}
|
||||
finally {
|
||||
if (producerBinding != null) {
|
||||
producerBinding.unbind();
|
||||
}
|
||||
if (consumerBinding != null) {
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
Thread.sleep(500);
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
KafkaClient {
|
||||
com.sun.security.auth.module.Krb5LoginModule required
|
||||
useKeyTab=true
|
||||
storeKey=true
|
||||
keyTab="/etc/security/keytabs/kafka_client.keytab"
|
||||
principal="kafka-client-1@EXAMPLE.COM";
|
||||
};
|
||||
@@ -0,0 +1,14 @@
|
||||
KafkaClient {
|
||||
com.sun.security.auth.module.Krb5LoginModule required
|
||||
useKeyTab=true
|
||||
storeKey=true
|
||||
keyTab="/etc/security/keytabs/kafka_client.keytab"
|
||||
principal="kafka-client-1@EXAMPLE.COM";
|
||||
};
|
||||
Client {
|
||||
com.sun.security.auth.module.Krb5LoginModule required
|
||||
useKeyTab=true
|
||||
storeKey=true
|
||||
keyTab="/etc/security/keytabs/zk_client.keytab"
|
||||
principal="zk-client-1@EXAMPLE.COM";
|
||||
};
|
||||
@@ -0,0 +1,14 @@
|
||||
<configuration>
|
||||
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
<logger name="org.springframework.integration.kafka" level="INFO"/>
|
||||
<logger name="org.springframework.kafka" level="INFO"/>
|
||||
<logger name="org.springframework.cloud.stream" level="INFO" />
|
||||
<logger name="org.springframework.integration.channel" level="INFO" />
|
||||
<root level="WARN">
|
||||
<appender-ref ref="stdout"/>
|
||||
</root>
|
||||
</configuration>
|
||||
Reference in New Issue
Block a user