Compare commits
21 Commits
v1.1.0.REL
...
v1.2.0.M1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b06ff9b17 | ||
|
|
ddb520ac02 | ||
|
|
cb01dda042 | ||
|
|
525aea8a98 | ||
|
|
01a715a9a9 | ||
|
|
21ab8090b8 | ||
|
|
e0bba57a51 | ||
|
|
1668f0c694 | ||
|
|
36c90aa8c0 | ||
|
|
943a7ae148 | ||
|
|
a1633ab241 | ||
|
|
68fd3fb6d7 | ||
|
|
df6d472e99 | ||
|
|
23ae2b3088 | ||
|
|
2e595f34f2 | ||
|
|
acb1c60216 | ||
|
|
275f5cbb8e | ||
|
|
79643586dd | ||
|
|
b292f81d46 | ||
|
|
8ad98c09b9 | ||
|
|
624b039da5 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -18,6 +18,7 @@ _site/
|
||||
*.ipr
|
||||
*.iws
|
||||
.idea/*
|
||||
*/.idea
|
||||
.factorypath
|
||||
dump.rdb
|
||||
.apt_generated
|
||||
|
||||
25
mvnw
vendored
25
mvnw
vendored
@@ -57,27 +57,27 @@ case "`uname`" in
|
||||
#
|
||||
# Look for the Apple JDKs first to preserve the existing behaviour, and then look
|
||||
# for the new JDKs provided by Oracle.
|
||||
#
|
||||
#
|
||||
if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK ] ; then
|
||||
#
|
||||
# Apple JDKs
|
||||
#
|
||||
export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home
|
||||
fi
|
||||
|
||||
|
||||
if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Java/JavaVirtualMachines/CurrentJDK ] ; then
|
||||
#
|
||||
# Apple JDKs
|
||||
#
|
||||
export JAVA_HOME=/System/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home
|
||||
fi
|
||||
|
||||
|
||||
if [ -z "$JAVA_HOME" ] && [ -L "/Library/Java/JavaVirtualMachines/CurrentJDK" ] ; then
|
||||
#
|
||||
# Oracle JDKs
|
||||
#
|
||||
export JAVA_HOME=/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ -z "$JAVA_HOME" ] && [ -x "/usr/libexec/java_home" ]; then
|
||||
#
|
||||
@@ -219,16 +219,27 @@ concat_lines() {
|
||||
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-$(find_maven_basedir)}
|
||||
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
|
||||
|
||||
# Provide a "standardized" way to retrieve the CLI args that will
|
||||
# Provide a "standardized" way to retrieve the CLI args that will
|
||||
# work with both Windows and non-Windows executions.
|
||||
MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
|
||||
export MAVEN_CMD_LINE_ARGS
|
||||
|
||||
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
echo "Running version check"
|
||||
VERSION=$( sed '\!<parent!,\!</parent!d' `dirname $0`/pom.xml | grep '<version' | head -1 | sed -e 's/.*<version>//' -e 's!</version>.*$!!' )
|
||||
echo "The found version is [${VERSION}]"
|
||||
|
||||
if echo $VERSION | egrep -q 'M|RC'; then
|
||||
echo Activating \"milestone\" profile for version=\"$VERSION\"
|
||||
echo $MAVEN_ARGS | grep -q milestone || MAVEN_ARGS="$MAVEN_ARGS -Pmilestone"
|
||||
else
|
||||
echo Deactivating \"milestone\" profile for version=\"$VERSION\"
|
||||
echo $MAVEN_ARGS | grep -q milestone && MAVEN_ARGS=$(echo $MAVEN_ARGS | sed -e 's/-Pmilestone//')
|
||||
fi
|
||||
|
||||
exec "$JAVACMD" \
|
||||
$MAVEN_OPTS \
|
||||
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
|
||||
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
|
||||
${WRAPPER_LAUNCHER} "$@"
|
||||
|
||||
${WRAPPER_LAUNCHER} ${MAVEN_ARGS} "$@"
|
||||
|
||||
321
mvnw.cmd
vendored
321
mvnw.cmd
vendored
@@ -1,234 +1,145 @@
|
||||
#!/bin/sh
|
||||
# ----------------------------------------------------------------------------
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
# ----------------------------------------------------------------------------
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Licensed to the Apache Software Foundation (ASF) under one
|
||||
@REM or more contributor license agreements. See the NOTICE file
|
||||
@REM distributed with this work for additional information
|
||||
@REM regarding copyright ownership. The ASF licenses this file
|
||||
@REM to you under the Apache License, Version 2.0 (the
|
||||
@REM "License"); you may not use this file except in compliance
|
||||
@REM with the License. You may obtain a copy of the License at
|
||||
@REM
|
||||
@REM http://www.apache.org/licenses/LICENSE-2.0
|
||||
@REM
|
||||
@REM Unless required by applicable law or agreed to in writing,
|
||||
@REM software distributed under the License is distributed on an
|
||||
@REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
@REM KIND, either express or implied. See the License for the
|
||||
@REM specific language governing permissions and limitations
|
||||
@REM under the License.
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Maven2 Start Up Batch script
|
||||
#
|
||||
# Required ENV vars:
|
||||
# ------------------
|
||||
# JAVA_HOME - location of a JDK home dir
|
||||
#
|
||||
# Optional ENV vars
|
||||
# -----------------
|
||||
# M2_HOME - location of maven2's installed home dir
|
||||
# MAVEN_OPTS - parameters passed to the Java VM when running Maven
|
||||
# e.g. to debug Maven itself, use
|
||||
# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
|
||||
# MAVEN_SKIP_RC - flag to disable loading of mavenrc files
|
||||
# ----------------------------------------------------------------------------
|
||||
@REM ----------------------------------------------------------------------------
|
||||
@REM Maven2 Start Up Batch script
|
||||
@REM
|
||||
@REM Required ENV vars:
|
||||
@REM JAVA_HOME - location of a JDK home dir
|
||||
@REM
|
||||
@REM Optional ENV vars
|
||||
@REM M2_HOME - location of maven2's installed home dir
|
||||
@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands
|
||||
@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a key stroke before ending
|
||||
@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven
|
||||
@REM e.g. to debug Maven itself, use
|
||||
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
|
||||
@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files
|
||||
@REM ----------------------------------------------------------------------------
|
||||
|
||||
if [ -z "$MAVEN_SKIP_RC" ] ; then
|
||||
@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on'
|
||||
@echo off
|
||||
@REM enable echoing my setting MAVEN_BATCH_ECHO to 'on'
|
||||
@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO%
|
||||
|
||||
if [ -f /etc/mavenrc ] ; then
|
||||
. /etc/mavenrc
|
||||
fi
|
||||
@REM set %HOME% to equivalent of $HOME
|
||||
if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%")
|
||||
|
||||
if [ -f "$HOME/.mavenrc" ] ; then
|
||||
. "$HOME/.mavenrc"
|
||||
fi
|
||||
@REM Execute a user defined script before this one
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre
|
||||
@REM check for pre script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat"
|
||||
if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd"
|
||||
:skipRcPre
|
||||
|
||||
fi
|
||||
@setlocal
|
||||
|
||||
# OS specific support. $var _must_ be set to either true or false.
|
||||
cygwin=false;
|
||||
darwin=false;
|
||||
mingw=false
|
||||
case "`uname`" in
|
||||
CYGWIN*) cygwin=true ;;
|
||||
MINGW*) mingw=true;;
|
||||
Darwin*) darwin=true
|
||||
#
|
||||
# Look for the Apple JDKs first to preserve the existing behaviour, and then look
|
||||
# for the new JDKs provided by Oracle.
|
||||
#
|
||||
if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK ] ; then
|
||||
#
|
||||
# Apple JDKs
|
||||
#
|
||||
export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home
|
||||
fi
|
||||
|
||||
if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Java/JavaVirtualMachines/CurrentJDK ] ; then
|
||||
#
|
||||
# Apple JDKs
|
||||
#
|
||||
export JAVA_HOME=/System/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home
|
||||
fi
|
||||
|
||||
if [ -z "$JAVA_HOME" ] && [ -L "/Library/Java/JavaVirtualMachines/CurrentJDK" ] ; then
|
||||
#
|
||||
# Oracle JDKs
|
||||
#
|
||||
export JAVA_HOME=/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home
|
||||
fi
|
||||
set ERROR_CODE=0
|
||||
|
||||
if [ -z "$JAVA_HOME" ] && [ -x "/usr/libexec/java_home" ]; then
|
||||
#
|
||||
# Apple JDKs
|
||||
#
|
||||
export JAVA_HOME=`/usr/libexec/java_home`
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
@REM To isolate internal variables from possible post scripts, we use another setlocal
|
||||
@setlocal
|
||||
|
||||
if [ -z "$JAVA_HOME" ] ; then
|
||||
if [ -r /etc/gentoo-release ] ; then
|
||||
JAVA_HOME=`java-config --jre-home`
|
||||
fi
|
||||
fi
|
||||
@REM ==== START VALIDATION ====
|
||||
if not "%JAVA_HOME%" == "" goto OkJHome
|
||||
|
||||
if [ -z "$M2_HOME" ] ; then
|
||||
## resolve links - $0 may be a link to maven's home
|
||||
PRG="$0"
|
||||
echo.
|
||||
echo Error: JAVA_HOME not found in your environment. >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
# need this for relative symlinks
|
||||
while [ -h "$PRG" ] ; do
|
||||
ls=`ls -ld "$PRG"`
|
||||
link=`expr "$ls" : '.*-> \(.*\)$'`
|
||||
if expr "$link" : '/.*' > /dev/null; then
|
||||
PRG="$link"
|
||||
else
|
||||
PRG="`dirname "$PRG"`/$link"
|
||||
fi
|
||||
done
|
||||
:OkJHome
|
||||
if exist "%JAVA_HOME%\bin\java.exe" goto init
|
||||
|
||||
saveddir=`pwd`
|
||||
echo.
|
||||
echo Error: JAVA_HOME is set to an invalid directory. >&2
|
||||
echo JAVA_HOME = "%JAVA_HOME%" >&2
|
||||
echo Please set the JAVA_HOME variable in your environment to match the >&2
|
||||
echo location of your Java installation. >&2
|
||||
echo.
|
||||
goto error
|
||||
|
||||
M2_HOME=`dirname "$PRG"`/..
|
||||
@REM ==== END VALIDATION ====
|
||||
|
||||
# make it fully qualified
|
||||
M2_HOME=`cd "$M2_HOME" && pwd`
|
||||
:init
|
||||
|
||||
cd "$saveddir"
|
||||
# echo Using m2 at $M2_HOME
|
||||
fi
|
||||
set MAVEN_CMD_LINE_ARGS=%*
|
||||
|
||||
# For Cygwin, ensure paths are in UNIX format before anything is touched
|
||||
if $cygwin ; then
|
||||
[ -n "$M2_HOME" ] &&
|
||||
M2_HOME=`cygpath --unix "$M2_HOME"`
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME=`cygpath --unix "$JAVA_HOME"`
|
||||
[ -n "$CLASSPATH" ] &&
|
||||
CLASSPATH=`cygpath --path --unix "$CLASSPATH"`
|
||||
fi
|
||||
@REM Find the project base dir, i.e. the directory that contains the folder ".mvn".
|
||||
@REM Fallback to current working directory if not found.
|
||||
|
||||
# For Migwn, ensure paths are in UNIX format before anything is touched
|
||||
if $mingw ; then
|
||||
[ -n "$M2_HOME" ] &&
|
||||
M2_HOME="`(cd "$M2_HOME"; pwd)`"
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`"
|
||||
# TODO classpath?
|
||||
fi
|
||||
set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR%
|
||||
IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir
|
||||
|
||||
if [ -z "$JAVA_HOME" ]; then
|
||||
javaExecutable="`which javac`"
|
||||
if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then
|
||||
# readlink(1) is not available as standard on Solaris 10.
|
||||
readLink=`which readlink`
|
||||
if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then
|
||||
if $darwin ; then
|
||||
javaHome="`dirname \"$javaExecutable\"`"
|
||||
javaExecutable="`cd \"$javaHome\" && pwd -P`/javac"
|
||||
else
|
||||
javaExecutable="`readlink -f \"$javaExecutable\"`"
|
||||
fi
|
||||
javaHome="`dirname \"$javaExecutable\"`"
|
||||
javaHome=`expr "$javaHome" : '\(.*\)/bin'`
|
||||
JAVA_HOME="$javaHome"
|
||||
export JAVA_HOME
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
set EXEC_DIR=%CD%
|
||||
set WDIR=%EXEC_DIR%
|
||||
:findBaseDir
|
||||
IF EXIST "%WDIR%"\.mvn goto baseDirFound
|
||||
cd ..
|
||||
IF "%WDIR%"=="%CD%" goto baseDirNotFound
|
||||
set WDIR=%CD%
|
||||
goto findBaseDir
|
||||
|
||||
if [ -z "$JAVACMD" ] ; then
|
||||
if [ -n "$JAVA_HOME" ] ; then
|
||||
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
|
||||
# IBM's JDK on AIX uses strange locations for the executables
|
||||
JAVACMD="$JAVA_HOME/jre/sh/java"
|
||||
else
|
||||
JAVACMD="$JAVA_HOME/bin/java"
|
||||
fi
|
||||
else
|
||||
JAVACMD="`which java`"
|
||||
fi
|
||||
fi
|
||||
:baseDirFound
|
||||
set MAVEN_PROJECTBASEDIR=%WDIR%
|
||||
cd "%EXEC_DIR%"
|
||||
goto endDetectBaseDir
|
||||
|
||||
if [ ! -x "$JAVACMD" ] ; then
|
||||
echo "Error: JAVA_HOME is not defined correctly." >&2
|
||||
echo " We cannot execute $JAVACMD" >&2
|
||||
exit 1
|
||||
fi
|
||||
:baseDirNotFound
|
||||
set MAVEN_PROJECTBASEDIR=%EXEC_DIR%
|
||||
cd "%EXEC_DIR%"
|
||||
|
||||
if [ -z "$JAVA_HOME" ] ; then
|
||||
echo "Warning: JAVA_HOME environment variable is not set."
|
||||
fi
|
||||
:endDetectBaseDir
|
||||
|
||||
CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher
|
||||
IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig
|
||||
|
||||
# For Cygwin, switch paths to Windows format before running java
|
||||
if $cygwin; then
|
||||
[ -n "$M2_HOME" ] &&
|
||||
M2_HOME=`cygpath --path --windows "$M2_HOME"`
|
||||
[ -n "$JAVA_HOME" ] &&
|
||||
JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"`
|
||||
[ -n "$CLASSPATH" ] &&
|
||||
CLASSPATH=`cygpath --path --windows "$CLASSPATH"`
|
||||
fi
|
||||
@setlocal EnableExtensions EnableDelayedExpansion
|
||||
for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a
|
||||
@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS%
|
||||
|
||||
# traverses directory structure from process work directory to filesystem root
|
||||
# first directory with .mvn subdirectory is considered project base directory
|
||||
find_maven_basedir() {
|
||||
local basedir=$(pwd)
|
||||
local wdir=$(pwd)
|
||||
while [ "$wdir" != '/' ] ; do
|
||||
if [ -d "$wdir"/.mvn ] ; then
|
||||
basedir=$wdir
|
||||
break
|
||||
fi
|
||||
wdir=$(cd "$wdir/.."; pwd)
|
||||
done
|
||||
echo "${basedir}"
|
||||
}
|
||||
:endReadAdditionalConfig
|
||||
|
||||
# concatenates all lines of a file
|
||||
concat_lines() {
|
||||
if [ -f "$1" ]; then
|
||||
echo "$(tr -s '\n' ' ' < "$1")"
|
||||
fi
|
||||
}
|
||||
SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe"
|
||||
|
||||
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-$(find_maven_basedir)}
|
||||
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
|
||||
set WRAPPER_JAR="".\.mvn\wrapper\maven-wrapper.jar""
|
||||
set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
|
||||
# Provide a "standardized" way to retrieve the CLI args that will
|
||||
# work with both Windows and non-Windows executions.
|
||||
MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
|
||||
export MAVEN_CMD_LINE_ARGS
|
||||
%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CMD_LINE_ARGS%
|
||||
if ERRORLEVEL 1 goto error
|
||||
goto end
|
||||
|
||||
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
|
||||
:error
|
||||
set ERROR_CODE=1
|
||||
|
||||
exec "$JAVACMD" \
|
||||
$MAVEN_OPTS \
|
||||
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
|
||||
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
|
||||
${WRAPPER_LAUNCHER} "$@"
|
||||
:end
|
||||
@endlocal & set ERROR_CODE=%ERROR_CODE%
|
||||
|
||||
if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost
|
||||
@REM check for post script, once with legacy .bat ending and once with .cmd ending
|
||||
if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat"
|
||||
if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd"
|
||||
:skipRcPost
|
||||
|
||||
@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on'
|
||||
if "%MAVEN_BATCH_PAUSE%" == "on" pause
|
||||
|
||||
if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE%
|
||||
|
||||
exit /B %ERROR_CODE%
|
||||
|
||||
130
pom.xml
130
pom.xml
@@ -2,19 +2,20 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.RELEASE</version>
|
||||
<version>1.2.0.M1</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-parent</artifactId>
|
||||
<version>1.1.0.RELEASE</version>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>1.3.1.M1</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.7</java.version>
|
||||
<kafka.version>0.9.0.1</kafka.version>
|
||||
<spring-kafka.version>1.0.4.RELEASE</spring-kafka.version>
|
||||
<spring-kafka.version>1.0.5.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.0.1.RELEASE</spring-integration-kafka.version>
|
||||
<spring-cloud-stream.version>1.2.0.M1</spring-cloud-stream.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
@@ -22,46 +23,19 @@
|
||||
<module>spring-cloud-stream-binder-kafka-docs</module>
|
||||
<module>spring-cloud-stream-binder-kafka-0.10-test</module>
|
||||
</modules>
|
||||
<build>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<redirectTestOutputToFile>true</redirectTestOutputToFile>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<version>1.7</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<version>2.17</version>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.puppycrawl.tools</groupId>
|
||||
<artifactId>checkstyle</artifactId>
|
||||
<version>6.17</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
<configuration>
|
||||
<quiet>true</quiet>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-codec</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
@@ -92,6 +66,12 @@
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>${spring-integration-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
@@ -106,6 +86,74 @@
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
<build>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<version>1.7</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<version>2.17</version>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.puppycrawl.tools</groupId>
|
||||
<artifactId>checkstyle</artifactId>
|
||||
<version>7.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
<configuration>
|
||||
<quiet>true</quiet>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<redirectTestOutputToFile>true</redirectTestOutputToFile>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build-tools</artifactId>
|
||||
<version>1.3.1.M1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>checkstyle-validation</id>
|
||||
<phase>validate</phase>
|
||||
<configuration>
|
||||
<configLocation>checkstyle.xml</configLocation>
|
||||
<encoding>UTF-8</encoding>
|
||||
<consoleOutput>true</consoleOutput>
|
||||
<failsOnError>true</failsOnError>
|
||||
<includeTestSourceDirectory>true</includeTestSourceDirectory>
|
||||
</configuration>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>spring</id>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.RELEASE</version>
|
||||
<version>1.2.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
@@ -20,7 +20,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>1.1.0.RELEASE</version>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.RELEASE</version>
|
||||
<version>1.2.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-0.10-test</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder 0.10 Tests</description>
|
||||
@@ -28,7 +28,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>1.1.0.RELEASE</version>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
@@ -64,7 +64,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>1.1.0.RELEASE</version>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
@@ -73,7 +73,32 @@
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-schema</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-avro-serializer</artifactId>
|
||||
<version>3.0.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-schema-registry</artifactId>
|
||||
<version>3.0.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>confluent</id>
|
||||
<url>http://packages.confluent.io/maven/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
|
||||
</project>
|
||||
|
||||
@@ -21,27 +21,45 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
|
||||
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.Binding;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.Spy;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.test.core.BrokerAddress;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Integration tests for the {@link KafkaMessageChannelBinder}.
|
||||
*
|
||||
@@ -164,4 +182,64 @@ public class Kafka10BinderTests extends KafkaBinderTests {
|
||||
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCustomAvroSerialization() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
Map<String, Object> schemaRegistryProps = new HashMap<>();
|
||||
schemaRegistryProps.put("kafkastore.connection.url", configurationProperties.getZkConnectionString());
|
||||
schemaRegistryProps.put("listeners", "http://0.0.0.0:8082");
|
||||
schemaRegistryProps.put("port", "8082");
|
||||
schemaRegistryProps.put("kafkastore.topic", "_schemas");
|
||||
SchemaRegistryConfig config = new SchemaRegistryConfig(schemaRegistryProps);
|
||||
SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
|
||||
Server server = app.createServer();
|
||||
server.start();
|
||||
long endTime = System.currentTimeMillis() + 5000;
|
||||
while(true) {
|
||||
if (server.isRunning()) {
|
||||
break;
|
||||
}
|
||||
else if (System.currentTimeMillis() > endTime) {
|
||||
fail("Kafka Schema Registry Server failed to start");
|
||||
}
|
||||
}
|
||||
User1 firstOutboundFoo = new User1();
|
||||
String userName1 = "foo-name" + UUID.randomUUID().toString();
|
||||
String favColor1 = "foo-color" + UUID.randomUUID().toString();
|
||||
firstOutboundFoo.setName(userName1);
|
||||
firstOutboundFoo.setFavoriteColor(favColor1);
|
||||
Message<?> message = MessageBuilder.withPayload(firstOutboundFoo).build();
|
||||
SubscribableChannel moduleOutputChannel = new DirectChannel();
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.getExtension().getConfiguration().put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||
producerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
|
||||
producerProperties.setUseNativeEncoding(true);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
consumerProperties.getExtension().getConfiguration().put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||
consumerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertTrue(message.getPayload() instanceof User1);
|
||||
User1 receivedUser = (User1) message.getPayload();
|
||||
assertThat(receivedUser.getName()).isEqualTo(userName1);
|
||||
assertThat(receivedUser.getFavoriteColor()).isEqualTo(favColor1);
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.reflect.Nullable;
|
||||
import org.apache.avro.specific.SpecificRecordBase;
|
||||
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class User1 extends SpecificRecordBase {
|
||||
|
||||
@Nullable
|
||||
private String name;
|
||||
|
||||
@Nullable
|
||||
private String favoriteColor;
|
||||
|
||||
public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getFavoriteColor() {
|
||||
return this.favoriteColor;
|
||||
}
|
||||
|
||||
public void setFavoriteColor(String favoriteColor) {
|
||||
this.favoriteColor = favoriteColor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Schema getSchema() {
|
||||
try {
|
||||
return new Schema.Parser().parse(new ClassPathResource("schemas/users_v1.schema").getInputStream());
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(int i) {
|
||||
if (i == 0) {
|
||||
return getName().toString();
|
||||
}
|
||||
if (i == 1) {
|
||||
return getFavoriteColor().toString();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(int i, Object o) {
|
||||
if (i == 0) {
|
||||
setName((String) o);
|
||||
}
|
||||
if (i == 1) {
|
||||
setFavoriteColor((String) o);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
{"namespace": "org.springframework.cloud.stream.binder.kafka",
|
||||
"type": "record",
|
||||
"name": "User1",
|
||||
"fields": [
|
||||
{"name": "name", "type": "string"},
|
||||
{"name": "favoriteColor", "type": "string"}
|
||||
]
|
||||
}
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.RELEASE</version>
|
||||
<version>1.2.0.M1</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
|
||||
@@ -18,7 +18,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>1.1.0.RELEASE</version>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<profiles>
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.RELEASE</version>
|
||||
<version>1.2.0.M1</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
@@ -66,6 +66,11 @@
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>${spring-integration-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
|
||||
@@ -38,7 +38,6 @@ import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.security.JaasUtils;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
@@ -248,8 +247,7 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
|
||||
props.put(ProducerConfig.RETRIES_CONFIG, 0);
|
||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
|
||||
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
|
||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
|
||||
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
@@ -258,7 +256,6 @@ public class KafkaMessageChannelBinder extends
|
||||
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
|
||||
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
|
||||
producerProperties.getExtension().getCompressionType().toString());
|
||||
|
||||
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
|
||||
props.putAll(producerProperties.getExtension().getConfiguration());
|
||||
}
|
||||
@@ -303,29 +300,20 @@ public class KafkaMessageChannelBinder extends
|
||||
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
|
||||
"DLQ support is not available for anonymous subscriptions");
|
||||
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
|
||||
|
||||
Map<String, Object> props = getConsumerConfig(anonymous, consumerGroup);
|
||||
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
|
||||
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
|
||||
|
||||
if (!ObjectUtils.isEmpty(properties.getExtension().getConfiguration())) {
|
||||
props.putAll(properties.getExtension().getConfiguration());
|
||||
}
|
||||
|
||||
ConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(props, keyDecoder,
|
||||
valueDecoder);
|
||||
|
||||
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
|
||||
Collection<PartitionInfo> listenedPartitions = destination;
|
||||
Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
|
||||
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
|
||||
listenedPartitions);
|
||||
|
||||
final ContainerProperties containerProperties =
|
||||
anonymous || properties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(name)
|
||||
: new ContainerProperties(topicPartitionInitialOffsets);
|
||||
|
||||
int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
|
||||
final ConcurrentMessageListenerContainer<byte[], byte[]> messageListenerContainer =
|
||||
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
|
||||
new ConcurrentMessageListenerContainer(
|
||||
consumerFactory, containerProperties) {
|
||||
|
||||
@@ -339,25 +327,20 @@ public class KafkaMessageChannelBinder extends
|
||||
if (!properties.getExtension().isAutoCommitOffset()) {
|
||||
messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
|
||||
}
|
||||
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug(
|
||||
"Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
|
||||
}
|
||||
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug(
|
||||
"Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
|
||||
}
|
||||
|
||||
final KafkaMessageDrivenChannelAdapter<byte[], byte[]> kafkaMessageDrivenChannelAdapter =
|
||||
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter =
|
||||
new KafkaMessageDrivenChannelAdapter<>(
|
||||
messageListenerContainer);
|
||||
|
||||
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
|
||||
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
|
||||
kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);
|
||||
|
||||
if (properties.getExtension().isEnableDlq()) {
|
||||
final String dlqTopic = "error." + name + "." + group;
|
||||
initDlqProducer();
|
||||
@@ -400,6 +383,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
private Map<String, Object> getConsumerConfig(boolean anonymous, String consumerGroup) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
|
||||
props.putAll(configurationProperties.getConfiguration());
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ public class KafkaBinderConfiguration {
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
|
||||
this.configurationProperties);
|
||||
kafkaMessageChannelBinder.setCodec(this.codec);
|
||||
//kafkaMessageChannelBinder.setProducerListener(producerListener);
|
||||
kafkaMessageChannelBinder.setProducerListener(producerListener);
|
||||
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
|
||||
kafkaMessageChannelBinder.setAdminUtilsOperation(adminUtilsOperation);
|
||||
return kafkaMessageChannelBinder;
|
||||
|
||||
@@ -1,3 +1,18 @@
|
||||
/*
|
||||
* Copyright 2014-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@SpringBootTest(classes = KafkaBinderConfiguration.class)
|
||||
public class KafkaBinderConfigurationTest {
|
||||
|
||||
@Autowired
|
||||
private KafkaMessageChannelBinder kafkaMessageChannelBinder;
|
||||
|
||||
@Test
|
||||
public void testKafkaBinderProducerListener() {
|
||||
assertNotNull(this.kafkaMessageChannelBinder);
|
||||
Field producerListenerField = ReflectionUtils.findField(
|
||||
KafkaMessageChannelBinder.class, "producerListener",
|
||||
ProducerListener.class);
|
||||
ReflectionUtils.makeAccessible(producerListenerField);
|
||||
ProducerListener producerListener = (ProducerListener) ReflectionUtils.getField(
|
||||
producerListenerField, this.kafkaMessageChannelBinder);
|
||||
assertNotNull(producerListener);
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
@@ -27,6 +28,10 @@ import java.util.concurrent.TimeUnit;
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.assertj.core.api.Condition;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
@@ -35,6 +40,7 @@ import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.BinderException;
|
||||
import org.springframework.cloud.stream.binder.Binding;
|
||||
import org.springframework.cloud.stream.binder.DefaultBinding;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
|
||||
@@ -46,7 +52,10 @@ import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.integration.IntegrationMessageHeaderAccessor;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
|
||||
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.TopicPartitionInitialOffset;
|
||||
@@ -54,6 +63,7 @@ import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
@@ -63,9 +73,11 @@ import org.springframework.retry.support.RetryTemplate;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public abstract class KafkaBinderTests extends PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>> {
|
||||
@@ -82,7 +94,9 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
|
||||
@Override
|
||||
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
|
||||
return new ExtendedProducerProperties<>(new KafkaProducerProperties());
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(new KafkaProducerProperties());
|
||||
producerProperties.getExtension().setSync(true);
|
||||
return producerProperties;
|
||||
}
|
||||
|
||||
public abstract String getKafkaOffsetHeaderKey();
|
||||
@@ -301,7 +315,6 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.0", "test", moduleInputChannel,
|
||||
consumerProperties);
|
||||
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload(testPayload)
|
||||
@@ -564,7 +577,6 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
String testPayload1 = "foo1-" + UUID.randomUUID().toString();
|
||||
output.send(new GenericMessage<>(testPayload1.getBytes()));
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> firstConsumerProperties = createConsumerProperties();
|
||||
firstConsumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "startOffsets", input1,
|
||||
firstConsumerProperties);
|
||||
Message<byte[]> receivedMessage1 = (Message<byte[]>) receive(input1);
|
||||
@@ -576,17 +588,14 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
assertThat(new String(receivedMessage2.getPayload())).isNotNull();
|
||||
consumerBinding.unbind();
|
||||
|
||||
Thread.sleep(2000);
|
||||
String testPayload3 = "foo3-" + UUID.randomUUID().toString();
|
||||
output.send(new GenericMessage<>(testPayload3.getBytes()));
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "startOffsets", input1, consumerProperties);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
Message<byte[]> receivedMessage3 = (Message<byte[]>) receive(input1);
|
||||
assertThat(receivedMessage3).isNotNull();
|
||||
assertThat(new String(receivedMessage3.getPayload())).isEqualTo(testPayload3);
|
||||
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
finally {
|
||||
if (consumerBinding != null) {
|
||||
@@ -1012,6 +1021,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
QueueChannel input = new QueueChannel();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, output,
|
||||
createProducerProperties());
|
||||
|
||||
String testPayload = "foo1-" + UUID.randomUUID().toString();
|
||||
output.send(new GenericMessage<>(testPayload.getBytes()));
|
||||
|
||||
@@ -1140,7 +1150,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
binding = binder.bindConsumer(testTopicName, "test-x", output, consumerProperties);
|
||||
|
||||
TopicPartitionInitialOffset[] listenedPartitions = TestUtils.getPropertyValue(binding,
|
||||
"endpoint.messageListenerContainer.containerProperties.topicPartitions",
|
||||
"lifecycle.messageListenerContainer.containerProperties.topicPartitions",
|
||||
TopicPartitionInitialOffset[].class);
|
||||
assertThat(listenedPartitions).hasSize(2);
|
||||
assertThat(listenedPartitions).contains(new TopicPartitionInitialOffset(testTopicName, 2),
|
||||
@@ -1187,6 +1197,159 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
assertThat(partitionSize(testTopicName)).isEqualTo(6);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testConsumerDefaultDeserializer() throws Exception {
|
||||
Binding<?> binding = null;
|
||||
try {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
final ZkUtils zkUtils = getZkUtils(configurationProperties);
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
invokeCreateTopic(zkUtils, testTopicName, 5, 1, new Properties());
|
||||
configurationProperties.setAutoCreateTopics(false);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding));
|
||||
assertTrue(consumerAccessor.getPropertyValue("keyDeserializer") instanceof ByteArrayDeserializer);
|
||||
assertTrue(consumerAccessor.getPropertyValue("valueDeserializer") instanceof ByteArrayDeserializer);
|
||||
}
|
||||
finally {
|
||||
if (binding != null) {
|
||||
binding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testConsumerCustomDeserializer() throws Exception {
|
||||
Binding<?> binding = null;
|
||||
try {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
Map<String, String> propertiesToOverride = configurationProperties.getConfiguration();
|
||||
propertiesToOverride.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
propertiesToOverride.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
|
||||
configurationProperties.setConfiguration(propertiesToOverride);
|
||||
final ZkUtils zkUtils = getZkUtils(configurationProperties);
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
invokeCreateTopic(zkUtils, testTopicName, 5, 1, new Properties());
|
||||
configurationProperties.setAutoCreateTopics(false);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding));
|
||||
assertTrue("Expected StringDeserializer as a custom key deserializer", consumerAccessor.getPropertyValue("keyDeserializer") instanceof StringDeserializer);
|
||||
assertTrue("Expected LongDeserializer as a custom value deserializer", consumerAccessor.getPropertyValue("valueDeserializer") instanceof LongDeserializer);
|
||||
}
|
||||
finally {
|
||||
if (binding != null) {
|
||||
binding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private KafkaConsumer getKafkaConsumer(Binding binding) {
|
||||
DirectFieldAccessor bindingAccessor = new DirectFieldAccessor((DefaultBinding)binding);
|
||||
KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) bindingAccessor.getPropertyValue("lifecycle");
|
||||
DirectFieldAccessor adapterAccessor = new DirectFieldAccessor(adapter);
|
||||
ConcurrentMessageListenerContainer messageListenerContainer = (ConcurrentMessageListenerContainer) adapterAccessor.getPropertyValue("messageListenerContainer");
|
||||
DirectFieldAccessor containerAccessor = new DirectFieldAccessor((ConcurrentMessageListenerContainer)messageListenerContainer);
|
||||
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) containerAccessor.getPropertyValue("consumerFactory");
|
||||
return (KafkaConsumer) consumerFactory.createConsumer();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testNativeSerializationWithCustomSerializerDeserializer() throws Exception {
|
||||
Binding<?> producerBinding = null;
|
||||
Binding<?> consumerBinding = null;
|
||||
try {
|
||||
Integer testPayload = new Integer(10);
|
||||
Message<?> message = MessageBuilder.withPayload(testPayload).build();
|
||||
SubscribableChannel moduleOutputChannel = new DirectChannel();
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
final ZkClient zkClient;
|
||||
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.setUseNativeEncoding(true);
|
||||
producerProperties.getExtension().getConfiguration().put("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
|
||||
producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
consumerProperties.getExtension().getConfiguration().put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel, 500);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(inbound.getPayload()).isEqualTo(10);
|
||||
assertThat(inbound.getHeaders()).doesNotContainKey("contentType");
|
||||
}
|
||||
finally {
|
||||
if (producerBinding != null) {
|
||||
producerBinding.unbind();
|
||||
}
|
||||
if (consumerBinding != null) {
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testBuiltinSerialization() throws Exception {
|
||||
Binding<?> producerBinding = null;
|
||||
Binding<?> consumerBinding = null;
|
||||
try {
|
||||
String testPayload = new String("test");
|
||||
Message<?> message = MessageBuilder.withPayload(testPayload).build();
|
||||
SubscribableChannel moduleOutputChannel = new DirectChannel();
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
final ZkClient zkClient;
|
||||
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel, 5);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(inbound.getPayload()).isEqualTo("test");
|
||||
assertThat(inbound.getHeaders()).containsEntry("contentType", "text/plain");
|
||||
}
|
||||
finally {
|
||||
if (producerBinding != null) {
|
||||
producerBinding.unbind();
|
||||
}
|
||||
if (consumerBinding != null) {
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
Thread.sleep(500);
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
<configuration>
|
||||
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
<logger name="org.springframework.integration.kafka" level="INFO"/>
|
||||
<logger name="org.springframework.kafka" level="INFO"/>
|
||||
<logger name="org.springframework.cloud.stream" level="INFO" />
|
||||
<logger name="org.springframework.integration.channel" level="INFO" />
|
||||
<root level="WARN">
|
||||
<appender-ref ref="stdout"/>
|
||||
</root>
|
||||
</configuration>
|
||||
Reference in New Issue
Block a user