GH-193: Make 2.0 branch up to date

fixes spring-cloud/spring-cloud-stream-binder-kafka#193

Integration missed commits and provide some polishing, improvements and fixes

Remove `resetOffsets` option

Fix #170

Use parent version for spring-cloud-build-tools

Add update version script

Fixes for consumer and producer property propagation

Fix #142 #129 #156 #162

- Remove conditional configuration for Boot 1.4 support
- Filter properties before creating consumer and producer property sets
- Restore `configuration` as Map<String,String> for fixing Boot binding
- Remove 0.9 tests

SCSt-GH-913: Error Handling via ErrorChannel

Relates to spring-cloud/spring-cloud-stream#913

Fixes #162

- configure an ErrorMessageSendingRecoverer to send errors to an error channel, whether or not retry is enabled.

Change Test Binder to use a Fully Wired Integration Context

- logging handler subscribed to errorChannel

Rebase; revert s-k to 1.1.x, Kafka to 0.10.1.1

Remove dependency overrides.

POM structure corrections

- move all intra-project deps to dependency management
- remove redundant overrides of Spring Integration Kafka

Remove reference to deleted module

- `spring-cloud-stream-binder-kafka-test-support` was previously
   removed, but it was still added as an unused dependency to the
   project

Remove duplicate debug statement.

unless you really really want to make sure users see this :)

GH-144: Add Kafka Streams Binder

Fix spring-cloud/spring-cloud-stream-binder-kafka#144

Addressing some PR reviews

Remove java 8 lambada expressions from KStreamBoundElementFactory

Initial - add support for serdes per binding

Fixing checkstyle issues
test source 1.8

Convert integration tests to use Java 7

Internal refactoring

Remove payload serde code in KStreamBoundElementFactory and reuse it from core

Addressing PR comments

cleanup around payload deserialization

Update to latest serialization logic

Extract common properites class for KStream producer/consumer

Addressing PR review comments

* Remove redundant dependencies for KStream Binder

Documentation for KStream binder

* Documentation for KStream binder

Fix #160

* Addressing PR review comments

* Addressing PR review comments

* Addressing PR review comments

Fixes #181

SCSt-GH-916: Configure Producer Error Channel

Requires: https://github.com/spring-cloud/spring-cloud-stream/pull/1039

Publish send failures to the error channel.

Add docs

Revert to Spring Kafka 1.1.6

GH-62: Remove Tuple Kryo Registrar Wrapper

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/62

No longer needed.

GH-169: Use the Actual Partition Count (Producer)

Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/169

If the configured `partitionCount` is less than the physical partition count on an existing
topic, the binder emits this message:

    The `partitionCount` of the producer for topic partJ.0 is 3, smaller than the actual partition count of 8 of the topic.
    The larger number will be used instead.

However, that is not true; the configured partition count is used.

Override the configured partition count with the actual partition count.

0.11 Binder

Initial Commit

- Transactional Binder

Version Updates

- Headers support

KStreams and 0.11

GH-188: KStream Binder Properties

KStream binder: support class for application level properties

Provide commonly used KStream application properties for convenient access at runtime

Fix #188

Since windowing operations are common in KStream applications, making the TimeWindows object
avaiable as a first class bean (using auto configuration). This bean is only created if the
relevant properties are provided by the user.

Kstream binder: producer default Serde changes

Change the way the default Serde classes are selected for key and value
in producer when only one of those is provided by the user.

Fix #190

KStream binder cleanup,
merge cleanup

re-update kafka version

2.0 related changes

Fix tests
Upgrade Kstream tests

converting anonymous classes to lambda expressions

Renaming Kafka-11 qualifier from test module
Refactoring test class names

cleanup
adding .jdk8 files

Fix KafkaBinderMetrics in 2.0

Fix #199

Addressing PR review comments

Addressing PR review comments
This commit is contained in:
Soby Chacko
2017-09-29 10:20:33 -04:00
parent 62b40b852f
commit a07a0017bb
45 changed files with 755 additions and 993 deletions

0
.jdk8 Normal file
View File

24
pom.xml
View File

@@ -11,9 +11,9 @@
<relativePath />
</parent>
<properties>
<java.version>1.7</java.version>
<kafka.version>0.10.2.0</kafka.version>
<java.version>1.8</java.version>
<spring-kafka.version>2.0.0.BUILD-SNAPSHOT</spring-kafka.version>
<kafka.version>0.11.0.0</kafka.version>
<spring-integration-kafka.version>3.0.0.BUILD-SNAPSHOT</spring-integration-kafka.version>
<spring-cloud-stream.version>2.0.0.BUILD-SNAPSHOT</spring-cloud-stream.version>
</properties>
@@ -21,6 +21,7 @@
<module>spring-cloud-stream-binder-kafka</module>
<module>spring-cloud-starter-stream-kafka</module>
<module>spring-cloud-stream-binder-kafka-docs</module>
<module>spring-cloud-stream-binder-kafka-test</module>
<module>spring-cloud-stream-binder-kafka-core</module>
<module>spring-cloud-stream-binder-kstream</module>
</modules>
@@ -42,11 +43,6 @@
<artifactId>spring-cloud-stream</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-codec</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
@@ -71,6 +67,11 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
@@ -105,6 +106,13 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
@@ -152,7 +160,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build-tools</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
<executions>

View File

View File

@@ -1,6 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>

View File

@@ -1,145 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.admin;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Properties;
import kafka.utils.ZkUtils;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
/**
* @author Soby Chacko
*/
public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
private static Class<?> ADMIN_UTIL_CLASS;
static {
try {
ADMIN_UTIL_CLASS = ClassUtils.forName("kafka.admin.AdminUtils", null);
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("AdminUtils class not found", e);
}
}
public void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions,
String replicaAssignmentStr, boolean checkBrokerAvailable) {
try {
Method[] declaredMethods = ADMIN_UTIL_CLASS.getDeclaredMethods();
Method addPartitions = null;
for (Method m : declaredMethods) {
if (m.getName().equals("addPartitions")) {
addPartitions = m;
}
}
if (addPartitions != null) {
addPartitions.invoke(null, zkUtils, topic, numPartitions,
replicaAssignmentStr, checkBrokerAvailable);
}
else {
throw new InvocationTargetException(
new RuntimeException("method not found"));
}
}
catch (InvocationTargetException e) {
ReflectionUtils.handleInvocationTargetException(e);
}
catch (IllegalAccessException e) {
ReflectionUtils.handleReflectionException(e);
}
}
public short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils) {
try {
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
Class<?> topicMetadataClass = ClassUtils.forName("kafka.api.TopicMetadata", null);
Method errorCodeMethod = ReflectionUtils.findMethod(topicMetadataClass, "errorCode");
return (short) errorCodeMethod.invoke(result);
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("AdminUtils class not found", e);
}
catch (InvocationTargetException e) {
ReflectionUtils.handleInvocationTargetException(e);
}
catch (IllegalAccessException e) {
ReflectionUtils.handleReflectionException(e);
}
return 0;
}
@SuppressWarnings("unchecked")
public int partitionSize(String topic, ZkUtils zkUtils) {
try {
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
Class<?> topicMetadataClass = ClassUtils.forName("kafka.api.TopicMetadata", null);
Method partitionsMetadata = ReflectionUtils.findMethod(topicMetadataClass, "partitionsMetadata");
scala.collection.Seq<kafka.api.PartitionMetadata> partitionSize =
(scala.collection.Seq<kafka.api.PartitionMetadata>)partitionsMetadata.invoke(result);
return partitionSize.size();
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("AdminUtils class not found", e);
}
catch (InvocationTargetException e) {
ReflectionUtils.handleInvocationTargetException(e);
}
catch (IllegalAccessException e) {
ReflectionUtils.handleReflectionException(e);
}
return 0;
}
public void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
int replicationFactor, Properties topicConfig) {
try {
Method[] declaredMethods = ADMIN_UTIL_CLASS.getDeclaredMethods();
Method createTopic = null;
for (Method m : declaredMethods) {
if (m.getName().equals("createTopic")) {
createTopic = m;
break;
}
}
if (createTopic != null) {
createTopic.invoke(null, zkUtils, topic, partitions,
replicationFactor, topicConfig);
}
else {
throw new InvocationTargetException(
new RuntimeException("method not found"));
}
}
catch (InvocationTargetException e) {
ReflectionUtils.handleInvocationTargetException(e);
}
catch (IllegalAccessException e) {
ReflectionUtils.handleReflectionException(e);
}
}
}

View File

@@ -25,7 +25,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
/**
* @author Soby Chacko
*/
public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
public class KafkaAdminUtilsOperation implements AdminUtilsOperation {
public void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions,
String replicaAssignmentStr, boolean checkBrokerAvailable) {

View File

@@ -40,6 +40,8 @@ import org.springframework.util.StringUtils;
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder")
public class KafkaBinderConfigurationProperties {
private final Transaction transaction = new Transaction();
@Autowired(required = false)
private KafkaProperties kafkaProperties;
@@ -96,6 +98,10 @@ public class KafkaBinderConfigurationProperties {
private JaasLoginModuleConfiguration jaas;
public Transaction getTransaction() {
return this.transaction;
}
public String getZkConnectionString() {
return toConnectionString(this.zkNodes, this.defaultZkPort);
}
@@ -352,4 +358,24 @@ public class KafkaBinderConfigurationProperties {
this.jaas = jaas;
}
public static class Transaction {
private final KafkaProducerProperties producer = new KafkaProducerProperties();
private String transactionIdPrefix;
public String getTransactionIdPrefix() {
return this.transactionIdPrefix;
}
public void setTransactionIdPrefix(String transactionIdPrefix) {
this.transactionIdPrefix = transactionIdPrefix;
}
public KafkaProducerProperties getProducer() {
return this.producer;
}
}
}

View File

@@ -43,6 +43,8 @@ public class KafkaConsumerProperties {
private int recoveryInterval = 5000;
private String[] trustedPackages;
private Map<String, String> configuration = new HashMap<>();
public boolean isAutoCommitOffset() {
@@ -123,4 +125,12 @@ public class KafkaConsumerProperties {
public void setDlqName(String dlqName) {
this.dlqName = dlqName;
}
public String[] getTrustedPackages() {
return trustedPackages;
}
public void setTrustedPackages(String[] trustedPackages) {
this.trustedPackages = trustedPackages;
}
}

View File

@@ -16,16 +16,17 @@
package org.springframework.cloud.stream.binder.kafka.properties;
import org.springframework.expression.Expression;
import java.util.HashMap;
import java.util.Map;
import javax.validation.constraints.NotNull;
import org.springframework.expression.Expression;
/**
* @author Marius Bogoevici
* @author Henryk Konsek
* @author Gary Russell
*/
public class KafkaProducerProperties {
@@ -39,6 +40,8 @@ public class KafkaProducerProperties {
private Expression messageKeyExpression;
private String[] headerPatterns;
private Map<String, String> configuration = new HashMap<>();
public int getBufferSize() {
@@ -82,6 +85,14 @@ public class KafkaProducerProperties {
this.messageKeyExpression = messageKeyExpression;
}
public String[] getHeaderPatterns() {
return this.headerPatterns;
}
public void setHeaderPatterns(String[] headerPatterns) {
this.headerPatterns = headerPatterns;
}
public Map<String, String> getConfiguration() {
return this.configuration;
}

View File

@@ -40,8 +40,6 @@ import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
@@ -203,29 +201,25 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
partitionCount);
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
this.metadataRetryOperations.execute(context -> {
@Override
public Object doWithRetry(RetryContext context) throws RuntimeException {
try {
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
configurationProperties.getReplicationFactor(), new Properties());
}
catch (Exception e) {
String exceptionClass = e.getClass().getName();
if (exceptionClass.equals("kafka.common.TopicExistsException")
|| exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")) {
if (logger.isWarnEnabled()) {
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
}
}
else {
throw e;
}
}
return null;
try {
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
configurationProperties.getReplicationFactor(), new Properties());
}
catch (Exception e) {
String exceptionClass = e.getClass().getName();
if (exceptionClass.equals("kafka.common.TopicExistsException")
|| exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")) {
if (logger.isWarnEnabled()) {
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
}
}
else {
throw e;
}
}
return null;
});
}
else {
@@ -243,27 +237,23 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
final Callable<Collection<PartitionInfo>> callable) {
try {
return this.metadataRetryOperations
.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() {
@Override
public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
Collection<PartitionInfo> partitions = callable.call();
// do a sanity check on the partition set
int partitionSize = partitions.size();
if (partitionSize < partitionCount) {
if (tolerateLowerPartitionsOnBroker) {
logger.warn("The number of expected partitions was: " + partitionCount + ", but "
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
+ "There will be " + (partitionCount - partitionSize) + " idle consumers");
}
else {
throw new IllegalStateException("The number of expected partitions was: "
+ partitionCount + ", but " + partitionSize
+ (partitionSize > 1 ? " have " : " has ") + "been found instead");
}
.execute(context -> {
Collection<PartitionInfo> partitions = callable.call();
// do a sanity check on the partition set
int partitionSize = partitions.size();
if (partitionSize < partitionCount) {
if (tolerateLowerPartitionsOnBroker) {
logger.warn("The number of expected partitions was: " + partitionCount + ", but "
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
+ "There will be " + (partitionCount - partitionSize) + " idle consumers");
}
else {
throw new IllegalStateException("The number of expected partitions was: "
+ partitionCount + ", but " + partitionSize
+ (partitionSize > 1 ? " have " : " has ") + "been found instead");
}
return partitions;
}
return partitions;
});
}
catch (Exception e) {

View File

@@ -26,6 +26,8 @@ include::overview.adoc[]
include::dlq.adoc[]
include::metrics.adoc[]
= Appendices
[appendix]
include::building.adoc[]

View File

@@ -528,34 +528,6 @@ spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.c
spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde
----
timewindow.length::
Many streaming applications written using Kafka Streams involve windowning operations.
If you specify this property, there is a `org.apache.kafka.streams.kstream.TimeWindows` bean automatically provided that can be autowired in applications.
This property must be prefixed with `spring.cloud.stream.kstream.`.
A bean of type `org.apache.kafka.streams.kstream.TimeWindows` is created only if this property is provided.
Following is an example of using this property.
Values are provided in milliseconds.
[source]
----
spring.cloud.stream.kstream.timeWindow.length=5000
----
timewindow.advanceBy::
This property goes hand in hand with `timewindow.length` and has no effect on its own.
If you provide this property, the generated `org.apache.kafka.streams.kstream.TimeWindows` bean will automatically conatin this information.
This property must be prefixed with `spring.cloud.stream.kstream.`.
Following is an example of using this property.
Values are provided in milliseconds.
[source]
----
spring.cloud.stream.kstream.timeWindow.advanceBy=1000
----
[[kafka-error-channels]]
== Error Channels
@@ -568,13 +540,3 @@ The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureExcep
* `record` - the raw `ProducerRecord` that was created from the `failedMessage`
There is no automatic handling of these exceptions (such as sending to a <<kafka-dlq-processing, Dead-Letter queue>>); you can consume these exceptions with your own Spring Integration flow.
[[kafka-metrics]]
== Kafka Metrics
Kafka binder module exposes the following metrics:
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag` - this metric indicates how many messages have not been yet consumed from given binder's topic by given consumer group.
For example if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, then consumer group `myGroup` has `1000` messages to waiting to be consumed from topic `myTopic`.
This metric is particularly useful to provide auto-scaling feedback to PaaS platform of your choice.

View File

@@ -4,18 +4,13 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<<<<<<< c396c5c756d1c01e1b6b0717ff64ae72ab78cb36
<<<<<<< b20f4a0e08629664c73e2e6cc1e73ac791509697
<version>1.3.1.BUILD-SNAPSHOT</version>
=======
<version>2.0.0.M1</version>
>>>>>>> Release 2.0.0.M1
=======
<version>2.0.0.BUILD-SNAPSHOT</version>
>>>>>>> Set version to 2.0.0.BUILD-SNAPSHOT
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.1-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.1 Tests</description>
<artifactId>spring-cloud-stream-binder-kafka-test</artifactId>
<description>Spring Cloud Stream Kafka Binder Tests</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
@@ -23,17 +18,20 @@
</organization>
<properties>
<main.basedir>${basedir}/../..</main.basedir>
<kafka.version>0.10.1.1</kafka.version>
<kafka.version>0.11.0.0</kafka.version>
<spring-kafka.version>2.0.0.BUILD-SNAPSHOT</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
@@ -45,12 +43,6 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
@@ -74,13 +66,6 @@
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jmx</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-0.10.2-test</artifactId>
@@ -102,39 +87,59 @@
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.1.2</version>
<version>3.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>3.1.2</version>
<version>3.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>2.26-b06</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.bundles.repackaged</groupId>
<artifactId>jersey-guava</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>http://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>http://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -42,10 +42,11 @@ import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.KafkaAdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.core.ConsumerFactory;
@@ -55,14 +56,17 @@ import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
*
* This test specifically tests for the 0.10.1.x version of Kafka.
*
* @author Eric Bottard
@@ -70,16 +74,16 @@ import static org.junit.Assert.assertTrue;
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
*/
public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
public class KafkaBinderTests extends AbstractKafkaBinderTests {
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
private Kafka10TestBinder binder;
private KafkaTestBinder binder;
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
private final KafkaAdminUtilsOperation adminUtilsOperation = new KafkaAdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
@@ -87,14 +91,15 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
}
@Override
protected Kafka10TestBinder getBinder() {
protected KafkaTestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binder = new Kafka10TestBinder(binderConfiguration);
binder = new KafkaTestBinder(binderConfiguration);
}
return binder;
}
@Override
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
@@ -139,7 +144,7 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
@Override
protected Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
return new Kafka10TestBinder(kafkaBinderConfigurationProperties);
return new KafkaTestBinder(kafkaBinderConfigurationProperties);
}
@Before
@@ -186,6 +191,8 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
Map<String, Object> schemaRegistryProps = new HashMap<>();
schemaRegistryProps.put("kafkastore.connection.url", configurationProperties.getZkConnectionString());
schemaRegistryProps.put("listeners", "http://0.0.0.0:8082");
@@ -238,4 +245,81 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
producerBinding.unbind();
consumerBinding.unbind();
}
@Override
public void testSendAndReceiveWithExplicitConsumerGroupWithRawMode() {
// raw mode no longer needed
}
@Override
public void testSendAndReceiveWithRawModeAndStringPayload() {
// raw mode no longer needed
}
@Test
@Override
@SuppressWarnings("unchecked")
public void testSendAndReceiveNoOriginalContentType() throws Exception {
Binder binder = getBinder();
BindingProperties producerBindingProperties = createProducerBindingProperties(
createProducerProperties());
DirectChannel moduleOutputChannel = createBindableChannel("output",
producerBindingProperties);
QueueChannel moduleInputChannel = new QueueChannel();
Binding<MessageChannel> producerBinding = binder.bindProducer("bar.0",
moduleOutputChannel, producerBindingProperties.getProducer());
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setTrustedPackages(new String[] {"org.springframework.util"});
Binding<MessageChannel> consumerBinding = binder.bindConsumer("bar.0",
"testSendAndReceiveNoOriginalContentType", moduleInputChannel,
consumerProperties);
binderBindUnbindLatency();
//TODO: Will have to fix the MimeType to convert to byte array once this issue has been resolved:
//https://github.com/spring-projects/spring-kafka/issues/424
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("foo".getBytes())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE.getBytes()).build();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
assertThat(inbound).isNotNull();
assertThat(inbound.getPayload()).isEqualTo("foo".getBytes());
assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.TEXT_PLAIN_VALUE.getBytes());
producerBinding.unbind();
consumerBinding.unbind();
}
@Test
@Override
@SuppressWarnings("unchecked")
public void testSendAndReceive() throws Exception {
Binder binder = getBinder();
BindingProperties outputBindingProperties = createProducerBindingProperties(
createProducerProperties());
DirectChannel moduleOutputChannel = createBindableChannel("output",
outputBindingProperties);
QueueChannel moduleInputChannel = new QueueChannel();
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.0",
moduleOutputChannel, outputBindingProperties.getProducer());
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.0",
"testSendAndReceive", moduleInputChannel, createConsumerProperties());
// Bypass conversion we are only testing sendReceive
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("foo".getBytes())
.setHeader(MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.APPLICATION_OCTET_STREAM_VALUE.getBytes())
.build();
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
assertThat(inbound).isNotNull();
assertThat(inbound.getPayload()).isEqualTo("foo".getBytes());
assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.APPLICATION_OCTET_STREAM_VALUE.getBytes());
producerBinding.unbind();
consumerBinding.unbind();
}
}

View File

@@ -0,0 +1,85 @@
/*
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.KafkaAdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
/**
* Test support class for {@link KafkaMessageChannelBinder}.
* @author Eric Bottard
* @author Marius Bogoevici
* @author David Turanski
* @author Gary Russell
* @author Soby Chacko
*/
public class KafkaTestBinder extends AbstractKafkaTestBinder {
@SuppressWarnings({ "rawtypes", "unchecked" })
public KafkaTestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
try {
AdminUtilsOperation adminUtilsOperation = new KafkaAdminUtilsOperation();
KafkaTopicProvisioner provisioningProvider =
new KafkaTopicProvisioner(binderConfiguration, adminUtilsOperation);
provisioningProvider.afterPropertiesSet();
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration,
provisioningProvider) {
/*
* Some tests use multiple instance indexes for the same topic; we need to make
* the error infrastructure beans unique.
*/
@Override
protected String errorsBaseName(ConsumerDestination destination, String group,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
return super.errorsBaseName(destination, group, consumerProperties) + "-"
+ consumerProperties.getInstanceIndex();
}
};
ProducerListener producerListener = new LoggingProducerListener();
binder.setProducerListener(producerListener);
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
setApplicationContext(context);
binder.setApplicationContext(context);
binder.afterPropertiesSet();
this.setBinder(binder);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Configuration
@EnableIntegration
static class Config {
}
}

View File

@@ -0,0 +1,85 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.specific.SpecificRecordBase;
import org.springframework.core.io.ClassPathResource;
/**
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
*/
public class User1 extends SpecificRecordBase {
@Nullable
private String name;
@Nullable
private String favoriteColor;
public String getName() {
return this.name;
}
public void setName(String name) {
this.name = name;
}
public String getFavoriteColor() {
return this.favoriteColor;
}
public void setFavoriteColor(String favoriteColor) {
this.favoriteColor = favoriteColor;
}
@Override
public Schema getSchema() {
try {
return new Schema.Parser().parse(new ClassPathResource("schemas/users_v1.schema").getInputStream());
}
catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public Object get(int i) {
if (i == 0) {
return getName().toString();
}
if (i == 1) {
return getFavoriteColor().toString();
}
return null;
}
@Override
public void put(int i, Object o) {
if (i == 0) {
setName((String) o);
}
if (i == 1) {
setFavoriteColor((String) o);
}
}
}

View File

View File

@@ -10,37 +10,14 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<<<<<<< c396c5c756d1c01e1b6b0717ff64ae72ab78cb36
<<<<<<< b20f4a0e08629664c73e2e6cc1e73ac791509697
<<<<<<< e3460d6fcef9406fc9c6018f0f828630b929a815
<version>1.3.1.BUILD-SNAPSHOT</version>
=======
<version>2.0.0.BUILD-SNAPSHOT</version>
>>>>>>> Update version to 2.0.0.BUILD-SNAPSHOT
=======
<version>2.0.0.M1</version>
>>>>>>> Release 2.0.0.M1
=======
<version>2.0.0.BUILD-SNAPSHOT</version>
>>>>>>> Set version to 2.0.0.BUILD-SNAPSHOT
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<<<<<<< c396c5c756d1c01e1b6b0717ff64ae72ab78cb36
<<<<<<< b20f4a0e08629664c73e2e6cc1e73ac791509697
<<<<<<< e3460d6fcef9406fc9c6018f0f828630b929a815
=======
<version>2.0.0.BUILD-SNAPSHOT</version>
>>>>>>> Update version to 2.0.0.BUILD-SNAPSHOT
=======
<version>2.0.0.M1</version>
>>>>>>> Release 2.0.0.M1
=======
<version>2.0.0.BUILD-SNAPSHOT</version>
>>>>>>> Set version to 2.0.0.BUILD-SNAPSHOT
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -98,9 +75,6 @@
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<<<<<<< b20f4a0e08629664c73e2e6cc1e73ac791509697
<<<<<<< e3460d6fcef9406fc9c6018f0f828630b929a815
=======
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
@@ -119,9 +93,6 @@
<version>3.2.1</version>
<scope>test</scope>
</dependency>
>>>>>>> Update version to 2.0.0.BUILD-SNAPSHOT
=======
>>>>>>> Release 2.0.0.M1
</dependencies>
<build>

View File

@@ -1,118 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import org.apache.kafka.common.security.JaasUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.util.Assert;
/**
* @author Marius Bogoevici
*/
public class KafkaBinderJaasInitializerListener implements ApplicationListener<ContextRefreshedEvent>,
ApplicationContextAware, DisposableBean {
public static final String DEFAULT_ZK_LOGIN_CONTEXT_NAME = "Client";
private ApplicationContext applicationContext;
private final boolean ignoreJavaLoginConfigParamSystemProperty;
private final File placeholderJaasConfiguration;
public KafkaBinderJaasInitializerListener() throws IOException {
// we ignore the system property if it wasn't originally set at launch
this.ignoreJavaLoginConfigParamSystemProperty =
(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) == null);
this.placeholderJaasConfiguration = File.createTempFile("kafka-client-jaas-config-placeholder", "conf");
this.placeholderJaasConfiguration.deleteOnExit();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void destroy() throws Exception {
if (this.ignoreJavaLoginConfigParamSystemProperty) {
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
}
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getSource() == this.applicationContext) {
KafkaBinderConfigurationProperties binderConfigurationProperties =
applicationContext.getBean(KafkaBinderConfigurationProperties.class);
// only use programmatic support if a file is not set via system property
if (ignoreJavaLoginConfigParamSystemProperty
&& binderConfigurationProperties.getJaas() != null) {
Map<String, AppConfigurationEntry[]> configurationEntries = new HashMap<>();
AppConfigurationEntry kafkaClientConfigurationEntry = new AppConfigurationEntry
(binderConfigurationProperties.getJaas().getLoginModule(),
binderConfigurationProperties.getJaas().getControlFlagValue(),
binderConfigurationProperties.getJaas().getOptions() != null ?
binderConfigurationProperties.getJaas().getOptions() :
Collections.<String, Object>emptyMap());
configurationEntries.put(JaasUtils.LOGIN_CONTEXT_CLIENT,
new AppConfigurationEntry[]{ kafkaClientConfigurationEntry });
Configuration.setConfiguration(new InternalConfiguration(configurationEntries));
// Workaround for a 0.9 client issue where even if the Configuration is set
// a system property check is performed.
// Since the Configuration already exists, this will be ignored.
if (this.placeholderJaasConfiguration != null) {
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, this.placeholderJaasConfiguration.getAbsolutePath());
}
}
}
}
/**
* A {@link Configuration} set up programmatically by the Kafka binder
*/
public static class InternalConfiguration extends Configuration {
private final Map<String, AppConfigurationEntry[]> configurationEntries;
public InternalConfiguration(Map<String, AppConfigurationEntry[]> configurationEntries) {
Assert.notNull(configurationEntries, " cannot be null");
Assert.notEmpty(configurationEntries, " cannot be empty");
this.configurationEntries = configurationEntries;
}
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
return configurationEntries.get(name);
}
}
}

View File

@@ -15,12 +15,13 @@
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -30,8 +31,6 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.endpoint.PublicMetrics;
import org.springframework.boot.actuate.metrics.Metric;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@@ -42,7 +41,7 @@ import org.springframework.util.ObjectUtils;
*
* @author Henryk Konsek
*/
public class KafkaBinderMetrics implements PublicMetrics {
public class KafkaBinderMetrics implements MeterBinder {
private final static Logger LOG = LoggerFactory.getLogger(KafkaBinderMetrics.class);
@@ -68,8 +67,7 @@ public class KafkaBinderMetrics implements PublicMetrics {
}
@Override
public Collection<Metric<?>> metrics() {
List<Metric<?>> metrics = new LinkedList<>();
public void bindTo(MeterRegistry registry) {
for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse()
.entrySet()) {
if (!topicInfo.getValue().isConsumerTopic()) {
@@ -96,13 +94,12 @@ public class KafkaBinderMetrics implements PublicMetrics {
lag += endOffset.getValue();
}
}
metrics.add(new Metric<>(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), lag));
registry.gauge(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), lag);
}
catch (Exception e) {
LOG.debug("Cannot generate metric for topic: " + topic, e);
}
}
return metrics;
}
private ConsumerFactory<?, ?> createConsumerFactory(String group) {

View File

@@ -21,16 +21,17 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -61,16 +62,20 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.Message;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
@@ -93,37 +98,31 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
*/
public class KafkaMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
private final KafkaBinderConfigurationProperties configurationProperties;
private final Map<String, TopicInformation> topicsInUse = new HashMap<>();
private final KafkaTransactionManager<byte[], byte[]> transactionManager;
private ProducerListener<byte[], byte[]> producerListener;
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
super(false, headersToMap(configurationProperties), provisioningProvider);
super(true, null, provisioningProvider);
this.configurationProperties = configurationProperties;
}
private static String[] headersToMap(KafkaBinderConfigurationProperties configurationProperties) {
String[] headersToMap;
if (ObjectUtils.isEmpty(configurationProperties.getHeaders())) {
headersToMap = BinderHeaders.STANDARD_HEADERS;
if (StringUtils.hasText(configurationProperties.getTransaction().getTransactionIdPrefix())) {
this.transactionManager = new KafkaTransactionManager<>(
getProducerFactory(configurationProperties.getTransaction().getTransactionIdPrefix(),
new ExtendedProducerProperties<>(configurationProperties.getTransaction().getProducer())));
}
else {
String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0,
BinderHeaders.STANDARD_HEADERS.length + configurationProperties.getHeaders().length);
System.arraycopy(configurationProperties.getHeaders(), 0, combinedHeadersToMap,
BinderHeaders.STANDARD_HEADERS.length,
configurationProperties.getHeaders().length);
headersToMap = combinedHeadersToMap;
this.transactionManager = null;
}
return headersToMap;
}
public void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) {
@@ -152,7 +151,16 @@ public class KafkaMessageChannelBinder extends
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel errorChannel)
throws Exception {
final DefaultKafkaProducerFactory<byte[], byte[]> producerFB = getProducerFactory(producerProperties);
/*
* IMPORTANT: With a transactional binder, individual producer properties for
* Kafka are ignored; the global binder
* (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
* instead, for all producers. A binder is transactional when
* 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
*/
final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
? this.transactionManager.getProducerFactory()
: getProducerFactory(null, producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
producerProperties.getPartitionCount(),
false,
@@ -195,10 +203,22 @@ public class KafkaMessageChannelBinder extends
if (errorChannel != null) {
handler.setSendFailureChannel(errorChannel);
}
String[] headerPatterns = producerProperties.getExtension().getHeaderPatterns();
if (headerPatterns != null && headerPatterns.length > 0) {
List<String> patterns = new LinkedList<>(Arrays.asList(headerPatterns));
if (!patterns.contains("!" + MessageHeaders.TIMESTAMP)) {
patterns.add(0, "!" + MessageHeaders.TIMESTAMP);
}
if (!patterns.contains("!" + MessageHeaders.ID)) {
patterns.add(0, "!" + MessageHeaders.ID);
}
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(patterns.toArray(new String[patterns.size()]));
handler.setHeaderMapper(headerMapper);
}
return handler;
}
private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(String transactionIdPrefix,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.RETRIES_CONFIG, 0);
@@ -227,7 +247,11 @@ public class KafkaMessageChannelBinder extends
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
}
return new DefaultKafkaProducerFactory<>(props);
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = new DefaultKafkaProducerFactory<>(props);
if (transactionIdPrefix != null) {
producerFactory.setTransactionIdPrefix(transactionIdPrefix);
}
return producerFactory;
}
@Override
@@ -284,6 +308,9 @@ public class KafkaMessageChannelBinder extends
|| extendedConsumerProperties.getExtension().isAutoRebalanceEnabled()
? new ContainerProperties(destination.getName())
: new ContainerProperties(topicPartitionInitialOffsets);
if (this.transactionManager != null) {
containerProperties.setTransactionManager(this.transactionManager);
}
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
@SuppressWarnings("rawtypes")
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
@@ -312,6 +339,14 @@ public class KafkaMessageChannelBinder extends
}
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(
messageListenerContainer);
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
String[] trustedPackages = extendedConsumerProperties.getExtension().getTrustedPackages();
if (!StringUtils.isEmpty(trustedPackages)) {
headerMapper.addTrustedPackages(trustedPackages);
}
messageConverter.setHeaderMapper(headerMapper);
kafkaMessageDrivenChannelAdapter.setMessageConverter(messageConverter);
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, consumerGroup,
extendedConsumerProperties);
@@ -334,48 +369,46 @@ public class KafkaMessageChannelBinder extends
protected MessageHandler getErrorMessageHandler(final ConsumerDestination destination, final String group,
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(
new ExtendedProducerProperties<>(new KafkaProducerProperties()));
ProducerFactory<byte[], byte[]> producerFactory = this.transactionManager != null
? this.transactionManager.getProducerFactory()
: getProducerFactory(null, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
final KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
return new MessageHandler() {
return message -> {
final ConsumerRecord<?, ?> record = message.getHeaders()
.get(KafkaHeaders.RAW_DATA, ConsumerRecord.class);
final byte[] key = record.key() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
: null;
final byte[] payload = record.value() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
? extendedConsumerProperties.getExtension().getDlqName()
: "error." + destination.getName() + "." + group;
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(dlqName, record.partition(),
key, payload, record.headers());
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(producerRecord);
sentDlq.addCallback(new ListenableFutureCallback<SendResult<byte[], byte[]>>() {
StringBuilder sb = new StringBuilder().append(" a message with key='")
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'")
.append(" and payload='")
.append(toDisplayString(ObjectUtils.nullSafeToString(payload), 50))
.append("'").append(" received from ")
.append(record.partition());
@Override
public void handleMessage(Message<?> message) throws MessagingException {
final ConsumerRecord<?, ?> record = message.getHeaders()
.get(KafkaMessageDrivenChannelAdapter.KAFKA_RAW_DATA, ConsumerRecord.class);
final byte[] key = record.key() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
: null;
final byte[] payload = record.value() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
? extendedConsumerProperties.getExtension().getDlqName()
: "error." + destination.getName() + "." + group;
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(dlqName,
record.partition(), key, payload);
sentDlq.addCallback(new ListenableFutureCallback<SendResult<byte[], byte[]>>() {
StringBuilder sb = new StringBuilder().append(" a message with key='")
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'")
.append(" and payload='")
.append(toDisplayString(ObjectUtils.nullSafeToString(payload), 50))
.append("'").append(" received from ")
.append(record.partition());
@Override
public void onFailure(Throwable ex) {
KafkaMessageChannelBinder.this.logger.error(
"Error sending to DLQ " + sb.toString(), ex);
}
@Override
public void onFailure(Throwable ex) {
KafkaMessageChannelBinder.this.logger.error(
"Error sending to DLQ " + sb.toString(), ex);
@Override
public void onSuccess(SendResult<byte[], byte[]> result) {
if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
KafkaMessageChannelBinder.this.logger.debug(
"Sent to DLQ " + sb.toString());
}
}
@Override
public void onSuccess(SendResult<byte[], byte[]> result) {
if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
KafkaMessageChannelBinder.this.logger.debug(
"Sent to DLQ " + sb.toString());
}
}
});
}
});
};
}
return null;
@@ -439,11 +472,11 @@ public class KafkaMessageChannelBinder extends
private boolean running = true;
private final DefaultKafkaProducerFactory<byte[], byte[]> producerFactory;
private final ProducerFactory<byte[], byte[]> producerFactory;
ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
ProducerFactory<byte[], byte[]> producerFactory) {
super(kafkaTemplate);
setTopicExpression(new LiteralExpression(topic));
setMessageKeyExpression(producerProperties.getExtension().getMessageKeyExpression());
@@ -471,7 +504,9 @@ public class KafkaMessageChannelBinder extends
@Override
public void stop() {
producerFactory.stop();
if (this.producerFactory instanceof Lifecycle) {
((Lifecycle) producerFactory).stop();
}
this.running = false;
}

View File

@@ -20,43 +20,34 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.PublicMetrics;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderJaasInitializerListener;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.KafkaAdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.config.codec.kryo.KryoCodecAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.integration.codec.Codec;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.util.ObjectUtils;
@@ -72,15 +63,12 @@ import org.springframework.util.ObjectUtils;
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({ KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class})
@Import({ PropertyPlaceholderAutoConfiguration.class})
@EnableConfigurationProperties({ KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
@Autowired
private Codec codec;
@Autowired
private KafkaBinderConfigurationProperties configurationProperties;
@@ -105,7 +93,6 @@ public class KafkaBinderConfiguration {
KafkaMessageChannelBinder kafkaMessageChannelBinder() {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
this.configurationProperties, provisioningProvider());
kafkaMessageChannelBinder.setCodec(this.codec);
kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
return kafkaMessageChannelBinder;
@@ -136,45 +123,19 @@ public class KafkaBinderConfiguration {
}
@Bean
public PublicMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties);
}
@Bean(name = "adminUtilsOperation")
@Conditional(Kafka09Present.class)
@ConditionalOnClass(name = "kafka.admin.AdminUtils")
public AdminUtilsOperation kafka09AdminUtilsOperation() {
logger.info("AdminUtils selected: Kafka 0.9 AdminUtils");
return new Kafka09AdminUtilsOperation();
}
@Bean(name = "adminUtilsOperation")
@Conditional(Kafka10Present.class)
@ConditionalOnClass(name = "kafka.admin.AdminUtils")
public AdminUtilsOperation kafka10AdminUtilsOperation() {
logger.info("AdminUtils selected: Kafka 0.10 AdminUtils");
return new Kafka10AdminUtilsOperation();
return new KafkaAdminUtilsOperation();
}
@Bean
public ApplicationListener<?> jaasInitializer() throws IOException {
return new KafkaBinderJaasInitializerListener();
}
static class Kafka10Present implements Condition {
@Override
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
return AppInfoParser.getVersion().startsWith("0.10");
}
}
static class Kafka09Present implements Condition {
@Override
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
return AppInfoParser.getVersion().startsWith("0.9");
}
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
return new KafkaJaasLoginModuleInitializer();
}
public static class JaasConfigurationProperties {

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka;
import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
@@ -27,8 +28,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -82,6 +87,7 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
@@ -90,16 +96,13 @@ import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
/**
* @author Soby Chacko
* @author Ilayaperumal Gopinathan
* @author Henryk Konsek
* @author Gary Russell
*/
public abstract class KafkaBinderTests extends
public abstract class AbstractKafkaBinderTests extends
PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
@Rule
@@ -158,6 +161,7 @@ public abstract class KafkaBinderTests extends
moduleInputChannel.subscribe(handler);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(2);
producerProperties.getExtension().setHeaderPatterns(new String[] { MessageHeaders.CONTENT_TYPE });
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(withRetry ? 2 : 1);
consumerProperties.setBackOffInitialInterval(100);
@@ -183,35 +187,23 @@ public abstract class KafkaBinderTests extends
final AtomicReference<Message<?>> boundErrorChannelMessage = new AtomicReference<>();
final AtomicReference<Message<?>> globalErrorChannelMessage = new AtomicReference<>();
final AtomicBoolean hasRecovererInCallStack = new AtomicBoolean(!withRetry);
boundErrorChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
boundErrorChannelMessage.set(message);
String stackTrace = Arrays.toString(new RuntimeException().getStackTrace());
hasRecovererInCallStack.set(stackTrace.contains("ErrorMessageSendingRecoverer"));
}
});
globalErrorChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
globalErrorChannelMessage.set(message);
}
boundErrorChannel.subscribe(message -> {
boundErrorChannelMessage.set(message);
String stackTrace = Arrays.toString(new RuntimeException().getStackTrace());
hasRecovererInCallStack.set(stackTrace.contains("ErrorMessageSendingRecoverer"));
});
globalErrorChannel.subscribe(globalErrorChannelMessage::set);
Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
"error.dlqTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties);
binderBindUnbindLatency();
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
Message<byte[]> testMessage = MessageBuilder.withPayload(testMessagePayload.getBytes()).build();
moduleOutputChannel.send(testMessage);
Message<?> receivedMessage = receive(dlqChannel, 3);
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload);
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
binderBindUnbindLatency();
@@ -247,7 +239,7 @@ public abstract class KafkaBinderTests extends
"testGroup", moduleInputChannel, consumerProperties);
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
Message<byte[]> testMessage = MessageBuilder.withPayload(testMessagePayload.getBytes()).build();
moduleOutputChannel.send(testMessage);
assertThat(handler.getLatch().await((int) (timeoutMultiplier * 1000), TimeUnit.MILLISECONDS));
@@ -255,7 +247,7 @@ public abstract class KafkaBinderTests extends
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
Message<?> receivedMessage = handler.getReceivedMessages().entrySet().iterator().next().getValue();
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload);
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
consumerBinding.unbind();
@@ -265,13 +257,13 @@ public abstract class KafkaBinderTests extends
successfulInputChannel, consumerProperties);
binderBindUnbindLatency();
String testMessage2Payload = "test." + UUID.randomUUID().toString();
Message<String> testMessage2 = MessageBuilder.withPayload(testMessage2Payload).build();
Message<byte[]> testMessage2 = MessageBuilder.withPayload(testMessage2Payload.getBytes()).build();
moduleOutputChannel.send(testMessage2);
Message<?> firstReceived = receive(successfulInputChannel);
assertThat(firstReceived.getPayload()).isEqualTo(testMessagePayload);
assertThat(firstReceived.getPayload()).isEqualTo(testMessagePayload.getBytes());
Message<?> secondReceived = receive(successfulInputChannel);
assertThat(secondReceived.getPayload()).isEqualTo(testMessage2Payload);
assertThat(secondReceived.getPayload()).isEqualTo(testMessage2Payload.getBytes());
consumerBinding.unbind();
producerBinding.unbind();
}
@@ -304,18 +296,18 @@ public abstract class KafkaBinderTests extends
"error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties);
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
Message<byte[]> testMessage = MessageBuilder.withPayload(testMessagePayload.getBytes()).build();
moduleOutputChannel.send(testMessage);
Message<?> dlqMessage = receive(dlqChannel, 3);
assertThat(dlqMessage).isNotNull();
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload);
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
// first attempt fails
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator().next().getValue();
assertThat(handledMessage).isNotNull();
assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload);
assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
binderBindUnbindLatency();
dlqConsumerBinding.unbind();
@@ -326,11 +318,11 @@ public abstract class KafkaBinderTests extends
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0", "testGroup",
successfulInputChannel, consumerProperties);
String testMessage2Payload = "test." + UUID.randomUUID().toString();
Message<String> testMessage2 = MessageBuilder.withPayload(testMessage2Payload).build();
Message<byte[]> testMessage2 = MessageBuilder.withPayload(testMessage2Payload.getBytes()).build();
moduleOutputChannel.send(testMessage2);
Message<?> receivedMessage = receive(successfulInputChannel);
assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload);
assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload.getBytes());
binderBindUnbindLatency();
consumerBinding.unbind();
@@ -367,18 +359,18 @@ public abstract class KafkaBinderTests extends
dlqConsumerProperties);
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
Message<byte[]> testMessage = MessageBuilder.withPayload(testMessagePayload.getBytes()).build();
moduleOutputChannel.send(testMessage);
Message<?> dlqMessage = receive(dlqChannel, 3);
assertThat(dlqMessage).isNotNull();
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload);
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
// first attempt fails
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator().next().getValue();
assertThat(handledMessage).isNotNull();
assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload);
assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
binderBindUnbindLatency();
dlqConsumerBinding.unbind();
@@ -389,11 +381,11 @@ public abstract class KafkaBinderTests extends
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0", "testGroup",
successfulInputChannel, consumerProperties);
String testMessage2Payload = "test." + UUID.randomUUID().toString();
Message<String> testMessage2 = MessageBuilder.withPayload(testMessage2Payload).build();
Message<byte[]> testMessage2 = MessageBuilder.withPayload(testMessage2Payload.getBytes()).build();
moduleOutputChannel.send(testMessage2);
Message<?> receivedMessage = receive(successfulInputChannel);
assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload);
assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload.getBytes());
binderBindUnbindLatency();
consumerBinding.unbind();
@@ -598,8 +590,6 @@ public abstract class KafkaBinderTests extends
Binder binder = getBinder(createConfigurationProperties());
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
// binder.setApplicationContext(context);
// binder.afterPropertiesSet();
DirectChannel output = new DirectChannel();
QueueChannel input1 = new QueueChannel();
@@ -949,20 +939,28 @@ public abstract class KafkaBinderTests extends
}
};
ObjectMapper om = new ObjectMapper();
if (usesExplicitRouting()) {
assertThat(receive0.getPayload()).isEqualTo(0);
assertThat(receive1.getPayload()).isEqualTo(1);
assertThat(receive2.getPayload()).isEqualTo(2);
assertThat(om.readValue((byte[])receive0.getPayload(), Integer.class)).isEqualTo(0);
assertThat(om.readValue((byte[])receive1.getPayload(), Integer.class)).isEqualTo(1);
assertThat(om.readValue((byte[])receive2.getPayload(), Integer.class)).isEqualTo(2);
assertThat(receive2).has(correlationHeadersForPayload2);
}
else {
List<Message<?>> receivedMessages = Arrays.asList(receive0, receive1, receive2);
assertThat(receivedMessages).extracting("payload").containsExactlyInAnyOrder(0, 1, 2);
assertThat(receivedMessages).extracting("payload").containsExactlyInAnyOrder(new byte[]{48}, new byte[]{49}, new byte[]{50});
Condition<Message<?>> payloadIs2 = new Condition<Message<?>>() {
@Override
public boolean matches(Message<?> value) {
return value.getPayload().equals(2);
try {
return om.readValue((byte[]) value.getPayload(), Integer.class).equals(2);
}
catch (IOException e) {
//
}
return false;
}
};
assertThat(receivedMessages).filteredOn(payloadIs2).areExactly(1, correlationHeadersForPayload2);
@@ -1038,11 +1036,12 @@ public abstract class KafkaBinderTests extends
assertThat(receive2).isNotNull();
Message<?> receive3 = receive(input3);
assertThat(receive3).isNotNull();
ObjectMapper om = new ObjectMapper();
assertThat(receive0.getPayload()).isEqualTo(0);
assertThat(receive1.getPayload()).isEqualTo(1);
assertThat(receive2.getPayload()).isEqualTo(2);
assertThat(receive3.getPayload()).isEqualTo(3);
assertThat(om.readValue((byte[])receive0.getPayload(), Integer.class)).isEqualTo(0);
assertThat(om.readValue((byte[])receive1.getPayload(), Integer.class)).isEqualTo(1);
assertThat(om.readValue((byte[])receive2.getPayload(), Integer.class)).isEqualTo(2);
assertThat(om.readValue((byte[])receive3.getPayload(), Integer.class)).isEqualTo(3);
input0Binding.unbind();
input1Binding.unbind();
@@ -1454,6 +1453,7 @@ public abstract class KafkaBinderTests extends
}
}
//TODO: We need to evaluate this test as built in serialization has different meaning in 2.0
@Test
@SuppressWarnings("unchecked")
public void testBuiltinSerialization() throws Exception {
@@ -1461,7 +1461,9 @@ public abstract class KafkaBinderTests extends
Binding<?> consumerBinding = null;
try {
String testPayload = new String("test");
Message<?> message = MessageBuilder.withPayload(testPayload).build();
Message<?> message = MessageBuilder.withPayload(testPayload.getBytes())
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE.getBytes())
.build();
SubscribableChannel moduleOutputChannel = new DirectChannel();
String testTopicName = "existing" + System.currentTimeMillis();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
@@ -1484,8 +1486,8 @@ public abstract class KafkaBinderTests extends
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel, 5);
assertThat(inbound).isNotNull();
assertThat(inbound.getPayload()).isEqualTo("test");
assertThat(inbound.getHeaders()).containsEntry("contentType", "text/plain");
assertThat(inbound.getPayload()).isEqualTo("test".getBytes());
assertThat(inbound.getHeaders()).containsEntry("contentType", "text/plain".getBytes());
}
finally {
if (producerBinding != null) {
@@ -1754,7 +1756,7 @@ public abstract class KafkaBinderTests extends
producerProps.setHeaderMode(HeaderMode.raw);
producerProps.setErrorChannelEnabled(true);
Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0", moduleOutputChannel, producerProps);
final Message<?> message = MessageBuilder.withPayload("bad").setHeader(MessageHeaders.CONTENT_TYPE, "foo/bar")
final Message<?> message = MessageBuilder.withPayload("bad").setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.build();
SubscribableChannel ec = binder.getApplicationContext().getBean("ec.0.errors", SubscribableChannel.class);
final AtomicReference<Message<?>> errorMessage = new AtomicReference<>();
@@ -1785,7 +1787,7 @@ public abstract class KafkaBinderTests extends
new DirectFieldAccessor(endpoint).setPropertyValue("kafkaTemplate",
new KafkaTemplate(mock(ProducerFactory.class)) {
@Override
@Override // SIK < 2.3
public ListenableFuture<SendResult> send(String topic, Object payload) {
sent.set(payload);
SettableListenableFuture<SendResult> future = new SettableListenableFuture<>();
@@ -1793,6 +1795,14 @@ public abstract class KafkaBinderTests extends
return future;
}
@Override // SIK 2.3+
public ListenableFuture send(ProducerRecord record) {
sent.set(record.value());
SettableListenableFuture<SendResult> future = new SettableListenableFuture<>();
future.setException(fooException);
return future;
}
});
moduleOutputChannel.send(message);
@@ -1801,7 +1811,8 @@ public abstract class KafkaBinderTests extends
assertThat(errorMessage.get().getPayload()).isInstanceOf(KafkaSendFailureException.class);
KafkaSendFailureException exception = (KafkaSendFailureException) errorMessage.get().getPayload();
assertThat(exception.getCause()).isSameAs(fooException);
assertThat(new String(((byte[] )exception.getFailedMessage().getPayload()))).isEqualTo(message.getPayload());
ObjectMapper om = new ObjectMapper();
assertThat(om.readValue((byte[] )exception.getFailedMessage().getPayload(), String.class)).isEqualTo(message.getPayload());
assertThat(exception.getRecord().value()).isSameAs(sent.get());
producerBinding.unbind();
}
@@ -1830,7 +1841,7 @@ public abstract class KafkaBinderTests extends
@Override
public void handleMessage(Message<?> message) throws MessagingException {
invocationCount++;
Long offset = message.getHeaders().get(KafkaBinderTests.this.getKafkaOffsetHeaderKey(), Long.class);
Long offset = message.getHeaders().get(AbstractKafkaBinderTests.this.getKafkaOffsetHeaderKey(), Long.class);
// using the offset as key allows to ensure that we don't store duplicate
// messages on retry
if (!receivedMessages.containsKey(offset)) {

View File

@@ -21,9 +21,6 @@ import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.codec.Codec;
import org.springframework.integration.codec.kryo.PojoCodec;
import org.springframework.integration.tuple.TupleKryoRegistrar;
/**
* @author Soby Chacko
@@ -47,9 +44,4 @@ public abstract class AbstractKafkaTestBinder extends
return this.applicationContext;
}
protected static Codec getCodec() {
return new PojoCodec(new TupleKryoRegistrar());
}
}

View File

@@ -66,10 +66,10 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
ExtendedProducerProperties.class);
String.class, ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, producerProperties);
.invoke(this.kafkaMessageChannelBinder, "foo", producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);

View File

@@ -63,10 +63,10 @@ public class KafkaBinderConfigurationPropertiesTest {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
kafkaProducerProperties);
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
ExtendedProducerProperties.class);
String.class, ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, producerProperties);
.invoke(this.kafkaMessageChannelBinder, "bar", producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);

View File

@@ -1,61 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import javax.security.auth.login.AppConfigurationEntry;
import com.sun.security.auth.login.ConfigFile;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.io.ClassPathResource;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
*/
public class KafkaBinderJaasInitializerListenerTest {
@Test
public void testConfigurationParsedCorrectlyWithKafkaClient() throws Exception {
ConfigFile configFile = new ConfigFile(new ClassPathResource("jaas-sample-kafka-only.conf").getURI());
final AppConfigurationEntry[] kafkaConfigurationArray = configFile.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_CLIENT);
final ConfigurableApplicationContext context =
SpringApplication.run(SimpleApplication.class,
"--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true",
"--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true",
"--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab",
"--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM");
javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration.getConfiguration();
final AppConfigurationEntry[] kafkaConfiguration = configuration.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_CLIENT);
assertThat(kafkaConfiguration).hasSize(1);
assertThat(kafkaConfiguration[0].getOptions()).isEqualTo(kafkaConfigurationArray[0].getOptions());
context.close();
}
@SpringBootApplication
public static class SimpleApplication {
}
}

View File

@@ -16,11 +16,12 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
@@ -31,7 +32,6 @@ import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.boot.actuate.metrics.Metric;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.TopicInformation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@@ -61,6 +61,8 @@ public class KafkaBinderMetricsTest {
@Mock
private KafkaMessageChannelBinder binder;
private MeterRegistry meterRegistry = new SimpleMeterRegistry();
private Map<String, TopicInformation> topicsInUse = new HashMap<>();
@Mock
@@ -82,11 +84,10 @@ public class KafkaBinderMetricsTest {
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Collection<Metric<?>> collectedMetrics = metrics.metrics();
assertThat(collectedMetrics).hasSize(1);
assertThat(collectedMetrics.iterator().next().getName())
.isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(500L);
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).hasSize(1);
MeterRegistry.Search group = meterRegistry.find(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(group.gauge().get().value()).isEqualTo(500.0);
}
@Test
@@ -99,11 +100,10 @@ public class KafkaBinderMetricsTest {
List<PartitionInfo> partitions = partitions(new Node(0, null, 0), new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Collection<Metric<?>> collectedMetrics = metrics.metrics();
assertThat(collectedMetrics).hasSize(1);
assertThat(collectedMetrics.iterator().next().getName())
.isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(1000L);
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).hasSize(1);
MeterRegistry.Search group = meterRegistry.find(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(group.gauge().get().value()).isEqualTo(1000.0);
}
@Test
@@ -111,19 +111,18 @@ public class KafkaBinderMetricsTest {
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Collection<Metric<?>> collectedMetrics = metrics.metrics();
assertThat(collectedMetrics).hasSize(1);
assertThat(collectedMetrics.iterator().next().getName())
.isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(1000L);
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).hasSize(1);
MeterRegistry.Search group = meterRegistry.find(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(group.gauge().get().value()).isEqualTo(1000.0);
}
@Test
public void shouldNotCalculateLagForProducerTopics() {
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation(null, partitions));
Collection<Metric<?>> collectedMetrics = metrics.metrics();
assertThat(collectedMetrics).isEmpty();
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).isEmpty();
}
private List<PartitionInfo> partitions(Node... nodes) {

View File

@@ -4,6 +4,7 @@
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<logger name="org.apache.kafka" level="DEBUG"/>
<logger name="org.springframework.integration.kafka" level="INFO"/>
<logger name="org.springframework.kafka" level="INFO"/>
<logger name="org.springframework.cloud.stream" level="INFO" />

View File

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>2.0.0.BUILD-SNAPSHOT</version>
</parent>
<dependencies>
@@ -23,10 +23,6 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-codec</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
@@ -52,13 +48,26 @@
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<classifier>test</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>

View File

@@ -23,18 +23,13 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
@@ -43,8 +38,6 @@ import org.springframework.cloud.stream.binder.kstream.config.KStreamConsumerPro
import org.springframework.cloud.stream.binder.kstream.config.KStreamExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kstream.config.KStreamProducerProperties;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.MimeType;
import org.springframework.util.StringUtils;
/**
@@ -54,8 +47,6 @@ public class KStreamBinder extends
AbstractBinder<KStream<Object, Object>, ExtendedConsumerProperties<KStreamConsumerProperties>, ExtendedProducerProperties<KStreamProducerProperties>>
implements ExtendedPropertiesBinder<KStream<Object, Object>, KStreamConsumerProperties, KStreamProducerProperties> {
private String[] headers;
private final KafkaTopicProvisioner kafkaTopicProvisioner;
private final KStreamExtendedBindingProperties kStreamExtendedBindingProperties;
@@ -67,7 +58,6 @@ public class KStreamBinder extends
public KStreamBinder(KafkaBinderConfigurationProperties binderConfigurationProperties, KafkaTopicProvisioner kafkaTopicProvisioner,
KStreamExtendedBindingProperties kStreamExtendedBindingProperties, StreamsConfig streamsConfig) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.headers = EmbeddedHeaderUtils.headersToEmbed(binderConfigurationProperties.getHeaders());
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kStreamExtendedBindingProperties = kStreamExtendedBindingProperties;
this.streamsConfig = streamsConfig;
@@ -90,43 +80,13 @@ public class KStreamBinder extends
ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<KafkaProducerProperties>(
new KafkaProducerProperties());
this.kafkaTopicProvisioner.provisionProducerDestination(name, extendedProducerProperties);
if (HeaderMode.embeddedHeaders.equals(properties.getHeaderMode())) {
outboundBindTarget = outboundBindTarget.map(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
@Override
public KeyValue<Object, Object> apply(Object k, Object v) {
if (v instanceof Message) {
try {
return new KeyValue<>(k, (Object) KStreamBinder.this.serializeAndEmbedHeadersIfApplicable((Message<?>) v));
}
catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
else {
throw new IllegalArgumentException("Wrong type of message " + v);
}
}
});
if (!properties.isUseNativeEncoding()) {
outboundBindTarget = outboundBindTarget
.map((k, v) -> KeyValue.pair(k, (Object) KStreamBinder.this.serializePayloadIfNecessary((Message<?>) v)));
}
else {
if (!properties.isUseNativeEncoding()) {
outboundBindTarget = outboundBindTarget
.map(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
@Override
public KeyValue<Object, Object> apply(Object k, Object v) {
return KeyValue.pair(k, (Object) KStreamBinder.this.serializePayloadIfNecessary((Message<?>) v));
}
});
}
else {
outboundBindTarget = outboundBindTarget
.map(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
@Override
public KeyValue<Object, Object> apply(Object k, Object v) {
return KeyValue.pair(k, ((Message<?>) v).getPayload());
}
});
}
outboundBindTarget = outboundBindTarget
.map((k, v) -> KeyValue.pair(k, ((Message<Object>) v).getPayload()));
}
if (!properties.isUseNativeEncoding() || StringUtils.hasText(properties.getExtension().getKeySerde()) || StringUtils.hasText(properties.getExtension().getValueSerde())) {
try {
@@ -167,24 +127,6 @@ public class KStreamBinder extends
return new DefaultBinding<>(name, null, outboundBindTarget, null);
}
private byte[] serializeAndEmbedHeadersIfApplicable(Message<?> message) throws Exception {
MessageValues transformed = serializePayloadIfNecessary(message);
byte[] payload;
Object contentType = transformed.get(MessageHeaders.CONTENT_TYPE);
// transform content type headers to String, so that they can be properly embedded
// in JSON
if (contentType instanceof MimeType) {
transformed.put(MessageHeaders.CONTENT_TYPE, contentType.toString());
}
Object originalContentType = transformed.get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE);
if (originalContentType instanceof MimeType) {
transformed.put(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, originalContentType.toString());
}
payload = EmbeddedHeaderUtils.embedHeaders(transformed, headers);
return payload;
}
@Override
public KStreamConsumerProperties getExtendedConsumerProperties(String channelName) {
return this.kStreamExtendedBindingProperties.getExtendedConsumerProperties(channelName);

View File

@@ -16,9 +16,6 @@
package org.springframework.cloud.stream.binder.kstream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.kafka.streams.KeyValue;
@@ -27,19 +24,13 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageSerializationUtils;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.StringConvertingContentTypeResolver;
import org.springframework.cloud.stream.binding.AbstractBindingTargetFactory;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.integration.codec.Codec;
import org.springframework.integration.support.MutableMessageHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
@@ -55,47 +46,31 @@ public class KStreamBoundElementFactory extends AbstractBindingTargetFactory<KSt
private final BindingServiceProperties bindingServiceProperties;
private volatile Codec codec;
private final StringConvertingContentTypeResolver contentTypeResolver = new StringConvertingContentTypeResolver();
private volatile Map<String, Class<?>> payloadTypeCache = new ConcurrentHashMap<>();
private CompositeMessageConverterFactory compositeMessageConverterFactory;
public KStreamBoundElementFactory(KStreamBuilder streamBuilder, BindingServiceProperties bindingServiceProperties,
Codec codec, CompositeMessageConverterFactory compositeMessageConverterFactory) {
CompositeMessageConverterFactory compositeMessageConverterFactory) {
super(KStream.class);
this.bindingServiceProperties = bindingServiceProperties;
this.kStreamBuilder = streamBuilder;
this.codec = codec;
this.compositeMessageConverterFactory = compositeMessageConverterFactory;
}
@Override
public KStream createInput(String name) {
KStream<Object, Object> stream = kStreamBuilder.stream(bindingServiceProperties.getBindingDestination(name));
ConsumerProperties properties = bindingServiceProperties.getConsumerProperties(name);
if (HeaderMode.embeddedHeaders.equals(properties.getHeaderMode())) {
stream = stream.map((key, value) -> {
stream = stream.map(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
@Override
public KeyValue<Object, Object> apply(Object key, Object value) {
if (!(value instanceof byte[])) {
return new KeyValue<>(key, value);
}
try {
MessageValues messageValues = EmbeddedHeaderUtils
.extractHeaders(MessageBuilder.withPayload((byte[]) value).build(), true);
messageValues = deserializePayloadIfNecessary(messageValues);
return new KeyValue<Object, Object>(null, messageValues.toMessage());
}
catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
});
}
BindingProperties bindingProperties = bindingServiceProperties.getBindingProperties(name);
String contentType = bindingProperties.getContentType();
if (!StringUtils.isEmpty(contentType)) {
Message<Object> message = MessageBuilder.withPayload(value)
.setHeader(MessageHeaders.CONTENT_TYPE, contentType).build();
return new KeyValue<>(key, message);
}
return new KeyValue<>(key, value);
});
return stream;
}
@@ -106,16 +81,12 @@ public class KStreamBoundElementFactory extends AbstractBindingTargetFactory<KSt
String contentType = bindingProperties.getContentType();
MessageConverter messageConverter = StringUtils.hasText(contentType) ? compositeMessageConverterFactory
.getMessageConverterForType(MimeType.valueOf(contentType)) : null;
KStreamWrapperHandler handler = new KStreamWrapperHandler(messageConverter);
KStreamWrapperHandler handler = new KStreamWrapperHandler(messageConverter, bindingServiceProperties, name);
ProxyFactory proxyFactory = new ProxyFactory(KStreamWrapper.class, KStream.class);
proxyFactory.addAdvice(handler);
return (KStream) proxyFactory.getProxy();
}
private MessageValues deserializePayloadIfNecessary(MessageValues messageValues) {
return MessageSerializationUtils.deserializePayload(messageValues, this.contentTypeResolver, this.codec);
}
interface KStreamWrapper {
void wrap(KStream<Object, Object> delegate);
@@ -126,23 +97,32 @@ public class KStreamBoundElementFactory extends AbstractBindingTargetFactory<KSt
private KStream<Object, Object> delegate;
private final MessageConverter messageConverter;
private final BindingServiceProperties bindingServiceProperties;
private String name;
public KStreamWrapperHandler(MessageConverter messageConverter) {
public KStreamWrapperHandler(MessageConverter messageConverter,
BindingServiceProperties bindingServiceProperties,
String name) {
this.messageConverter = messageConverter;
this.bindingServiceProperties = bindingServiceProperties;
this.name = name;
}
public void wrap(KStream<Object, Object> delegate) {
Assert.notNull(delegate, "delegate cannot be null");
Assert.isNull(this.delegate, "delegate already set to " + this.delegate);
if (messageConverter != null) {
KeyValueMapper<Object, Object, KeyValue<Object, Object>> keyValueMapper = new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
@Override
public KeyValue<Object, Object> apply(Object k, Object v) {
Message<?> message = (Message<?>) v;
return new KeyValue<Object, Object>(k,
messageConverter.toMessage(message.getPayload(),
new MutableMessageHeaders(((Message<?>) v).getHeaders())));
KeyValueMapper<Object, Object, KeyValue<Object, Object>> keyValueMapper = (k, v) -> {
Message<?> message = (Message<?>) v;
BindingProperties bindingProperties = bindingServiceProperties.getBindingProperties(name);
String contentType = bindingProperties.getContentType();
MutableMessageHeaders messageHeaders = new MutableMessageHeaders(((Message<?>) v).getHeaders());
if (!StringUtils.isEmpty(contentType)) {
messageHeaders.put(MessageHeaders.CONTENT_TYPE, contentType);
}
return new KeyValue<>(k,
messageConverter.toMessage(message.getPayload(),
messageHeaders));
};
delegate = delegate.map(keyValueMapper);
}

View File

@@ -52,22 +52,22 @@ public class KStreamListenerParameterAdapter implements StreamListenerParameterA
final Class<?> valueClass = (resolvableType.getGeneric(1).getRawClass() != null)
? (resolvableType.getGeneric(1).getRawClass()) : Object.class;
return bindingTarget.map(new KeyValueMapper() {
@Override
public Object apply(Object o, Object o2) {
if (valueClass.isAssignableFrom(o2.getClass())) {
return new KeyValue<>(o, o2);
}
else if (o2 instanceof Message) {
return new KeyValue<>(o, messageConverter.fromMessage((Message) o2, valueClass));
}
else if(o2 instanceof String || o2 instanceof byte[]) {
Message<Object> message = MessageBuilder.withPayload(o2).build();
return new KeyValue<>(o, messageConverter.fromMessage(message, valueClass));
}
else {
return new KeyValue<>(o, o2);
return bindingTarget.map((KeyValueMapper) (o, o2) -> {
if (valueClass.isAssignableFrom(o2.getClass())) {
return new KeyValue<>(o, o2);
}
else if (o2 instanceof Message) {
if (valueClass.isAssignableFrom(((Message) o2).getPayload().getClass())) {
return new KeyValue<>(o, ((Message) o2).getPayload());
}
return new KeyValue<>(o, messageConverter.fromMessage((Message) o2, valueClass));
}
else if(o2 instanceof String || o2 instanceof byte[]) {
Message<Object> message = MessageBuilder.withPayload(o2).build();
return new KeyValue<>(o, messageConverter.fromMessage(message, valueClass));
}
else {
return new KeyValue<>(o, o2);
}
});
}

View File

@@ -21,7 +21,6 @@ import java.io.IOException;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.messaging.Message;
@@ -40,15 +39,12 @@ public class KStreamStreamListenerResultAdapter implements StreamListenerResultA
@Override
@SuppressWarnings("unchecked")
public Closeable adapt(KStream streamListenerResult, KStreamBoundElementFactory.KStreamWrapper boundElement) {
boundElement.wrap(streamListenerResult.map(new KeyValueMapper() {
@Override
public Object apply(Object k, Object v) {
if (v instanceof Message<?>) {
return new KeyValue<>(k, v);
}
else {
return new KeyValue<>(k, MessageBuilder.withPayload(v).build());
}
boundElement.wrap(streamListenerResult.map((k, v) -> {
if (v instanceof Message<?>) {
return new KeyValue<>(k, v);
}
else {
return new KeyValue<>(k, MessageBuilder.withPayload(v).build());
}
}));
return new NoOpCloseable();

View File

@@ -18,27 +18,22 @@ package org.springframework.cloud.stream.binder.kstream.config;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.KafkaAdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kstream.KStreamBinder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.type.AnnotatedTypeMetadata;
/**
* @author Marius Bogoevici
* @author Gary Russell
*/
@Configuration
@EnableConfigurationProperties(KStreamExtendedBindingProperties.class)
@@ -63,34 +58,10 @@ public class KStreamBinderConfiguration {
}
@Bean(name = "adminUtilsOperation")
@Conditional(Kafka09Present.class)
@ConditionalOnClass(name = "kafka.admin.AdminUtils")
public AdminUtilsOperation kafka09AdminUtilsOperation() {
logger.info("AdminUtils selected: Kafka 0.9 AdminUtils");
return new Kafka09AdminUtilsOperation();
}
@Bean(name = "adminUtilsOperation")
@Conditional(Kafka10Present.class)
@ConditionalOnClass(name = "kafka.admin.AdminUtils")
public AdminUtilsOperation kafka10AdminUtilsOperation() {
logger.info("AdminUtils selected: Kafka 0.10 AdminUtils");
return new Kafka10AdminUtilsOperation();
return new KafkaAdminUtilsOperation();
}
static class Kafka10Present implements Condition {
@Override
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
return AppInfoParser.getVersion().startsWith("0.10");
}
}
static class Kafka09Present implements Condition {
@Override
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
return AppInfoParser.getVersion().startsWith("0.9");
}
}
}

View File

@@ -33,7 +33,6 @@ import org.springframework.cloud.stream.binder.kstream.KStreamStreamListenerResu
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.codec.Codec;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.core.KStreamBuilderFactoryBean;
import org.springframework.util.ObjectUtils;
@@ -70,10 +69,9 @@ public class KStreamBinderSupportAutoConfiguration {
public StreamsConfig streamsConfig(KafkaBinderConfigurationProperties binderConfigurationProperties) {
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, binderConfigurationProperties.getKafkaConnectionString());
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, binderConfigurationProperties.getZkConnectionString());
if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConfiguration())) {
props.putAll(binderConfigurationProperties.getConfiguration());
}
@@ -94,9 +92,9 @@ public class KStreamBinderSupportAutoConfiguration {
@Bean
public KStreamBoundElementFactory kStreamBindableTargetFactory(KStreamBuilder kStreamBuilder,
BindingServiceProperties bindingServiceProperties, Codec codec,
BindingServiceProperties bindingServiceProperties,
CompositeMessageConverterFactory compositeMessageConverterFactory) {
return new KStreamBoundElementFactory(kStreamBuilder, bindingServiceProperties, codec,
return new KStreamBoundElementFactory(kStreamBuilder, bindingServiceProperties,
compositeMessageConverterFactory);
}

View File

@@ -18,16 +18,13 @@ package org.springframework.cloud.stream.binder.kstream;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -52,20 +49,21 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
*
* @author Soby Chacko
* @author Gary Russell
*/
public class KStreamBinderPojoInputAndPrimitiveTypeOutputTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts-id");
private static Consumer<Integer, Long> consumer;
private static Consumer<Integer, String> consumer;
@BeforeClass
public static void setUp() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
//consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, Long> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts-id");
}
@@ -88,12 +86,16 @@ public class KStreamBinderPojoInputAndPrimitiveTypeOutputTests {
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.output.producer.useNativeEncoding=true",
"--spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"--spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde",
"--spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
receiveAndValidateFoo(context);
context.close();
try {
receiveAndValidateFoo(context);
}
finally {
context.close();
}
}
private void receiveAndValidateFoo(ConfigurableApplicationContext context) throws Exception{
@@ -102,10 +104,12 @@ public class KStreamBinderPojoInputAndPrimitiveTypeOutputTests {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("foos");
template.sendDefault("{\"id\":\"123\"}");
ConsumerRecord<Integer, Long> cr = KafkaTestUtils.getSingleRecord(consumer, "counts-id");
ConsumerRecord<Integer, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts-id");
assertThat(cr.key().equals(123));
assertThat(cr.value().equals(1L));
ObjectMapper om = new ObjectMapper();
Long aLong = om.readValue(cr.value(), Long.class);
assertThat(aLong.equals(1L));
}
@EnableBinding(KStreamProcessor.class)
@@ -116,30 +120,12 @@ public class KStreamBinderPojoInputAndPrimitiveTypeOutputTests {
@SendTo("output")
public KStream<Integer, Long> process(KStream<Object, Product> input) {
return input
.filter(new Predicate<Object, Product>() {
@Override
public boolean test(Object key, Product product) {
return product.getId() == 123;
}
})
.map(new KeyValueMapper<Object, Product, KeyValue<Product, Product>>() {
@Override
public KeyValue<Product, Product> apply(Object key, Product value) {
return new KeyValue<>(value, value);
}
})
.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
.count(TimeWindows.of(5000), "id-count-store")
.toStream()
.map(new KeyValueMapper<Windowed<Product>, Long, KeyValue<Integer, Long>>() {
@Override
public KeyValue<Integer, Long> apply(Windowed<Product> key, Long value) {
return new KeyValue<>(key.key().id, value);
}
});
.map((key, value) -> new KeyValue<>(key.key().id, value));
}
}

View File

@@ -18,7 +18,6 @@ package org.springframework.cloud.stream.binder.kstream;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
@@ -27,10 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -39,9 +35,11 @@ import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.cloud.stream.binder.kstream.config.KStreamApplicationSupportProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -56,6 +54,7 @@ import static org.assertj.core.api.Assertions.assertThat;
*
* @author Marius Bogoevici
* @author Soby Chacko
* @author Gary Russell
*/
public class KStreamBinderWordCountIntegrationTests {
@@ -92,13 +91,18 @@ public class KStreamBinderWordCountIntegrationTests {
"--spring.cloud.stream.kstream.binder.configuration.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.output.producer.useNativeEncoding=true",
"--spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kstream.timeWindow.length=5000",
"--spring.cloud.stream.kstream.timeWindow.advanceBy=0",
"--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
receiveAndValidate(context);
context.close();
try {
receiveAndValidate(context);
}
finally {
context.close();
}
}
private void receiveAndValidate(ConfigurableApplicationContext context) throws Exception{
@@ -113,6 +117,7 @@ public class KStreamBinderWordCountIntegrationTests {
@EnableBinding(KStreamProcessor.class)
@EnableAutoConfiguration
@EnableConfigurationProperties(KStreamApplicationSupportProperties.class)
public static class WordCountProcessorApplication {
@Autowired
@@ -123,30 +128,12 @@ public class KStreamBinderWordCountIntegrationTests {
public KStream<?, WordCount> process(KStream<Object, String> input) {
return input
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public List<String> apply(String value) {
return Arrays.asList(value.toLowerCase().split("\\W+"));
}
})
.map(new KeyValueMapper<Object, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(Object key, String value) {
return new KeyValue<>(value, value);
}
})
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serdes.String(), Serdes.String())
.count(timeWindows, "WordCounts")
.count(timeWindows, "foo-WordCounts")
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<Object, WordCount>>() {
@Override
public KeyValue<Object, WordCount> apply(Windowed<String> key, Long value) {
return new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end())));
}
});
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
}

View File

@@ -24,8 +24,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.junit.AfterClass;
@@ -54,6 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
* @author Gary Russell
*/
public class KStreamInteractiveQueryIntegrationTests {
@@ -86,13 +85,18 @@ public class KStreamInteractiveQueryIntegrationTests {
"--spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kstream.binder.configuration.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.binder.configuration.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.output.producer.useNativeEncoding=true",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
receiveAndValidateFoo(context);
context.close();
try {
receiveAndValidateFoo(context);
}
finally {
context.close();
}
}
private void receiveAndValidateFoo(ConfigurableApplicationContext context) throws Exception{
@@ -120,30 +124,12 @@ public class KStreamInteractiveQueryIntegrationTests {
public KStream<?, String> process(KStream<Object, Product> input) {
return input
.filter(new Predicate<Object, Product>() {
@Override
public boolean test(Object key, Product product) {
return product.getId() == 123;
}
})
.map(new KeyValueMapper<Object, Product, KeyValue<Integer, Product>>() {
@Override
public KeyValue<Integer, Product> apply(Object key, Product value) {
return new KeyValue<>(value.id, value);
}
})
.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey(new Serdes.IntegerSerde(), new JsonSerde<>(Product.class))
.count("prod-id-count-store")
.toStream()
.map(new KeyValueMapper<Integer, Long, KeyValue<Object, String>>() {
@Override
public KeyValue<Object, String> apply(Integer key, Long value) {
return new KeyValue<>(null, "Count for product with ID 123: " + value);
}
});
.map((key, value) -> new KeyValue<>(null, "Count for product with ID 123: " + value));
}
@Bean
@@ -151,7 +137,6 @@ public class KStreamInteractiveQueryIntegrationTests {
return new Foo(kStreamBuilderFactoryBean);
}
static class Foo {
KStreamBuilderFactoryBean kStreamBuilderFactoryBean;

View File

@@ -23,10 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -51,6 +48,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
* @author Soby Chacko
* @author Gary Russell
*/
public class KstreamBinderPojoInputStringOutputIntegrationTests {
@@ -86,11 +84,16 @@ public class KstreamBinderPojoInputStringOutputIntegrationTests {
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.output.producer.useNativeEncoding=true",
"--spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"--spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
receiveAndValidateFoo(context);
context.close();
try {
receiveAndValidateFoo(context);
}
finally {
context.close();
}
}
private void receiveAndValidateFoo(ConfigurableApplicationContext context) throws Exception {
@@ -112,30 +115,12 @@ public class KstreamBinderPojoInputStringOutputIntegrationTests {
public KStream<Integer, String> process(KStream<Object, Product> input) {
return input
.filter(new Predicate<Object, Product>() {
@Override
public boolean test(Object key, Product product) {
return product.getId() == 123;
}
})
.map(new KeyValueMapper<Object, Product, KeyValue<Product, Product>>() {
@Override
public KeyValue<Product, Product> apply(Object key, Product value) {
return new KeyValue<>(value, value);
}
})
.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class))
.count(TimeWindows.of(5000), "id-count-store")
.toStream()
.map(new KeyValueMapper<Windowed<Product>, Long, KeyValue<Integer, String>>() {
@Override
public KeyValue<Integer, String> apply(Windowed<Product> key, Long value) {
return new KeyValue<>(key.key().id, "Count for product with ID 123: " + value);
}
});
.map((key, value) -> new KeyValue<>(key.key().id, "Count for product with ID 123: " + value));
}
}

View File

@@ -7,7 +7,7 @@
lines=$(find . -name 'pom.xml' | xargs egrep "SNAPSHOT" | grep -v regex | wc -l)
lines=$(find . -name 'pom.xml' | xargs egrep "SNAPSHOT|M[0-9]|RC[0-9]" | grep -v regex | wc -l)
if [ $lines -eq 0 ]; then
echo "No snapshots found"
else