Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e1906711a8 | ||
|
|
78213c98e8 | ||
|
|
c10206d41b | ||
|
|
73eda0ddd0 | ||
|
|
5b51d7cce3 | ||
|
|
2530229fb5 | ||
|
|
70cba0ae03 | ||
|
|
3eb9493cba | ||
|
|
89b75b4734 | ||
|
|
dc0bc18f37 | ||
|
|
8fafc2605e | ||
|
|
83eca9b734 | ||
|
|
b2d579b5b6 | ||
|
|
c389fa3fa4 | ||
|
|
50e0a8b30e | ||
|
|
7daa0f1b72 | ||
|
|
f3e38961c5 |
18
pom.xml
18
pom.xml
@@ -2,7 +2,7 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.M1</version>
|
||||
<version>1.2.0.M2</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
@@ -12,17 +12,21 @@
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.7</java.version>
|
||||
<kafka.version>0.9.0.1</kafka.version>
|
||||
<spring-kafka.version>1.0.5.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.0.1.RELEASE</spring-integration-kafka.version>
|
||||
<spring-cloud-stream.version>1.2.0.M1</spring-cloud-stream.version>
|
||||
<kafka.version>0.10.1.1</kafka.version>
|
||||
<spring-kafka.version>1.1.2.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.1.0.RELEASE</spring-integration-kafka.version>
|
||||
<spring-cloud-stream.version>1.2.0.M2</spring-cloud-stream.version>
|
||||
<spring-boot.version>1.5.1.RELEASE</spring-boot.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
<module>spring-cloud-starter-stream-kafka</module>
|
||||
<module>spring-cloud-stream-binder-kafka-docs</module>
|
||||
<module>spring-cloud-stream-binder-kafka-0.10-test</module>
|
||||
</modules>
|
||||
<module>spring-cloud-stream-binder-kafka-0.9-test</module>
|
||||
<module>spring-cloud-stream-binder-kafka-0.10.0-test</module>
|
||||
<module>spring-cloud-stream-binder-kafka-core</module>
|
||||
<module>spring-cloud-stream-binder-kafka-test-support</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.M1</version>
|
||||
<version>1.2.0.M2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -4,10 +4,10 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.M1</version>
|
||||
<version>1.2.0.M2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-0.10-test</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder 0.10 Tests</description>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-0.10.0-test</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder 0.10.0 Tests</description>
|
||||
<url>http://projects.spring.io/spring-cloud</url>
|
||||
<organization>
|
||||
<name>Pivotal Software, Inc.</name>
|
||||
@@ -16,15 +16,20 @@
|
||||
<properties>
|
||||
<main.basedir>${basedir}/../..</main.basedir>
|
||||
<!--
|
||||
Override Kafka dependencies to Kafka 0.10 and supporting Spring Kafka and
|
||||
Override Kafka dependencies to Kafka 0.9.0.1 and supporting Spring Kafka and
|
||||
Spring Integration Kafka versions
|
||||
-->
|
||||
<kafka.version>0.10.0.0</kafka.version>
|
||||
<spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
|
||||
<kafka.version>0.10.0.1</kafka.version>
|
||||
<spring-kafka.version>1.1.2.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.1.0.RELEASE</spring-integration-kafka.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<version>1.2.0.M2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
@@ -100,5 +105,4 @@
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
|
||||
</project>
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015 the original author or authors.
|
||||
* Copyright 2014-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -14,7 +14,15 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* This package contains an implementation of the {@link org.springframework.cloud.stream.binder.Binder} for Kafka.
|
||||
*/
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
/**
|
||||
* Integration tests for the {@link KafkaMessageChannelBinder}.
|
||||
*
|
||||
* This test specifically tests for the 0.10.0.1 version of Kafka.
|
||||
*
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class Kafka_0_10_0_BinderTests extends Kafka10BinderTests {
|
||||
|
||||
}
|
||||
78
spring-cloud-stream-binder-kafka-0.9-test/pom.xml
Normal file
78
spring-cloud-stream-binder-kafka-0.9-test/pom.xml
Normal file
@@ -0,0 +1,78 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.M2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-0.9</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder 0.9 Tests</description>
|
||||
<url>http://projects.spring.io/spring-cloud</url>
|
||||
<organization>
|
||||
<name>Pivotal Software, Inc.</name>
|
||||
<url>http://www.spring.io</url>
|
||||
</organization>
|
||||
<properties>
|
||||
<main.basedir>${basedir}/../..</main.basedir>
|
||||
<!--
|
||||
Override Kafka dependencies to Kafka 0.9.0.1 and supporting Spring Kafka and
|
||||
Spring Integration Kafka versions
|
||||
-->
|
||||
<kafka.version>0.9.0.1</kafka.version>
|
||||
<spring-kafka.version>1.0.5.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.0.1.RELEASE</spring-integration-kafka.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -16,8 +16,10 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.kafka.support.LoggingProducerListener;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
@@ -35,14 +37,18 @@ public class Kafka09TestBinder extends AbstractKafkaTestBinder {
|
||||
|
||||
public Kafka09TestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
|
||||
try {
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration);
|
||||
AdminUtilsOperation adminUtilsOperation = new Kafka09AdminUtilsOperation();
|
||||
KafkaTopicProvisioner provisioningProvider =
|
||||
new KafkaTopicProvisioner(binderConfiguration, adminUtilsOperation);
|
||||
provisioningProvider.afterPropertiesSet();
|
||||
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration, provisioningProvider);
|
||||
binder.setCodec(getCodec());
|
||||
ProducerListener producerListener = new LoggingProducerListener();
|
||||
binder.setProducerListener(producerListener);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.setAdminUtilsOperation(new Kafka09AdminUtilsOperation());
|
||||
binder.afterPropertiesSet();
|
||||
this.setBinder(binder);
|
||||
}
|
||||
@@ -34,13 +34,12 @@ import org.junit.ClassRule;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.Spy;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.test.core.BrokerAddress;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
|
||||
/**
|
||||
* Integration tests for the {@link KafkaMessageChannelBinder}.
|
||||
@@ -50,7 +49,7 @@ import org.springframework.retry.RetryOperations;
|
||||
* @author Mark Fisher
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class Kafka09BinderTests extends KafkaBinderTests {
|
||||
public class Kafka_09_BinderTests extends KafkaBinderTests {
|
||||
|
||||
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
|
||||
|
||||
@@ -93,11 +92,6 @@ public class Kafka09BinderTests extends KafkaBinderTests {
|
||||
return consumerFactory().createConsumer().partitionsFor(topic).size();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setMetadataRetryOperations(Binder binder, RetryOperations retryOperations) {
|
||||
((Kafka09TestBinder) binder).getBinder().setMetadataRetryOperations(retryOperations);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
|
||||
46
spring-cloud-stream-binder-kafka-core/pom.xml
Normal file
46
spring-cloud-stream-binder-kafka-core/pom.xml
Normal file
@@ -0,0 +1,46 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.M2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
<url>http://projects.spring.io/spring-cloud</url>
|
||||
<organization>
|
||||
<name>Pivotal Software, Inc.</name>
|
||||
<url>http://www.spring.io</url>
|
||||
</organization>
|
||||
<properties>
|
||||
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>${spring-integration-kafka.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro-compiler</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -24,7 +24,7 @@ import kafka.utils.ZkUtils;
|
||||
* API around {@link kafka.admin.AdminUtils} to support
|
||||
* various versions of Kafka brokers.
|
||||
*
|
||||
* Note: Implementations that support Kafka brokers other than 0.9, need to use
|
||||
* Note: Implementations that support Kafka brokers other than 0.10, need to use
|
||||
* a possible strategy that involves reflection around {@link kafka.admin.AdminUtils}.
|
||||
*
|
||||
* @author Soby Chacko
|
||||
@@ -18,10 +18,8 @@ package org.springframework.cloud.stream.binder.kafka.admin;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import kafka.api.PartitionMetadata;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
import org.springframework.util.ClassUtils;
|
||||
@@ -30,7 +28,7 @@ import org.springframework.util.ReflectionUtils;
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
|
||||
|
||||
private static Class<?> ADMIN_UTIL_CLASS;
|
||||
|
||||
@@ -53,10 +51,9 @@ public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
addPartitions = m;
|
||||
}
|
||||
}
|
||||
|
||||
if (addPartitions != null) {
|
||||
addPartitions.invoke(null, zkUtils, topic, numPartitions,
|
||||
replicaAssignmentStr, checkBrokerAvailable, null);
|
||||
replicaAssignmentStr, checkBrokerAvailable);
|
||||
}
|
||||
else {
|
||||
throw new InvocationTargetException(
|
||||
@@ -69,20 +66,16 @@ public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
catch (IllegalAccessException e) {
|
||||
ReflectionUtils.handleReflectionException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils) {
|
||||
try {
|
||||
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
|
||||
|
||||
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
|
||||
Class<?> topicMetadataClass = ClassUtils.forName("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata", null);
|
||||
|
||||
Method errorCodeMethod = ReflectionUtils.findMethod(topicMetadataClass, "error");
|
||||
Object obj = errorCodeMethod.invoke(result);
|
||||
Method code = ReflectionUtils.findMethod(obj.getClass(), "code");
|
||||
|
||||
return (short) code.invoke(obj);
|
||||
Class<?> topicMetadataClass = ClassUtils.forName("kafka.api.TopicMetadata", null);
|
||||
Method errorCodeMethod = ReflectionUtils.findMethod(topicMetadataClass, "errorCode");
|
||||
return (short) errorCodeMethod.invoke(result);
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils class not found", e);
|
||||
@@ -94,7 +87,6 @@ public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
ReflectionUtils.handleReflectionException(e);
|
||||
}
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -102,11 +94,13 @@ public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
try {
|
||||
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
|
||||
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
|
||||
Class<?> topicMetadataClass = ClassUtils.forName("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata", null);
|
||||
Class<?> topicMetadataClass = ClassUtils.forName("kafka.api.TopicMetadata", null);
|
||||
|
||||
Method partitionsMetadata = ReflectionUtils.findMethod(topicMetadataClass, "partitionMetadata");
|
||||
List<PartitionMetadata> foo = (List<PartitionMetadata>) partitionsMetadata.invoke(result);
|
||||
return foo.size();
|
||||
Method partitionsMetadata = ReflectionUtils.findMethod(topicMetadataClass, "partitionsMetadata");
|
||||
scala.collection.Seq<kafka.api.PartitionMetadata> partitionSize =
|
||||
(scala.collection.Seq<kafka.api.PartitionMetadata>)partitionsMetadata.invoke(result);
|
||||
|
||||
return partitionSize.size();
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils class not found", e);
|
||||
@@ -118,22 +112,23 @@ public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
ReflectionUtils.handleReflectionException(e);
|
||||
}
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
||||
public void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
|
||||
int replicationFactor, Properties topicConfig) {
|
||||
int replicationFactor, Properties topicConfig) {
|
||||
try {
|
||||
Method[] declaredMethods = ADMIN_UTIL_CLASS.getDeclaredMethods();
|
||||
Method createTopic = null;
|
||||
for (Method m : declaredMethods) {
|
||||
if (m.getName().equals("createTopic") && m.getParameterTypes()[m.getParameterTypes().length - 1].getName().endsWith("RackAwareMode")) {
|
||||
if (m.getName().equals("createTopic")) {
|
||||
createTopic = m;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (createTopic != null) {
|
||||
createTopic.invoke(null, zkUtils, topic, partitions,
|
||||
replicationFactor, topicConfig, null);
|
||||
replicationFactor, topicConfig);
|
||||
}
|
||||
else {
|
||||
throw new InvocationTargetException(
|
||||
@@ -147,4 +142,4 @@ public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
ReflectionUtils.handleReflectionException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -19,33 +19,36 @@ package org.springframework.cloud.stream.binder.kafka.admin;
|
||||
import java.util.Properties;
|
||||
|
||||
import kafka.admin.AdminUtils;
|
||||
import kafka.api.TopicMetadata;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.apache.kafka.common.requests.MetadataResponse;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
|
||||
public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
|
||||
public void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions,
|
||||
String replicaAssignmentStr, boolean checkBrokerAvailable) {
|
||||
AdminUtils.addPartitions(zkUtils, topic, numPartitions,
|
||||
replicaAssignmentStr, checkBrokerAvailable);
|
||||
AdminUtils.addPartitions(zkUtils, topic, numPartitions, replicaAssignmentStr, checkBrokerAvailable, null);
|
||||
}
|
||||
|
||||
public short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils) {
|
||||
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
|
||||
return topicMetadata.errorCode();
|
||||
|
||||
MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
|
||||
return topicMetadata.error().code();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public int partitionSize(String topic, ZkUtils zkUtils) {
|
||||
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
|
||||
return topicMetadata.partitionsMetadata().size();
|
||||
|
||||
MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
|
||||
return topicMetadata.partitionMetadata().size();
|
||||
}
|
||||
|
||||
public void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
|
||||
int replicationFactor, Properties topicConfig) {
|
||||
int replicationFactor, Properties topicConfig) {
|
||||
|
||||
AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor,
|
||||
topicConfig);
|
||||
topicConfig, null);
|
||||
}
|
||||
}
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.config;
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.config;
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@@ -0,0 +1,314 @@
|
||||
/*
|
||||
* Copyright 2014-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.provisioning;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.security.JaasUtils;
|
||||
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.cloud.stream.binder.BinderException;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningException;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
|
||||
import org.springframework.retry.RetryCallback;
|
||||
import org.springframework.retry.RetryContext;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import kafka.common.ErrorMapping;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
/**
|
||||
* Kafka implementation for {@link ProvisioningProvider}
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>>, InitializingBean {
|
||||
|
||||
private final Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
private final KafkaBinderConfigurationProperties configurationProperties;
|
||||
|
||||
private final AdminUtilsOperation adminUtilsOperation;
|
||||
|
||||
private RetryOperations metadataRetryOperations;
|
||||
|
||||
public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
|
||||
AdminUtilsOperation adminUtilsOperation) {
|
||||
this.configurationProperties = kafkaBinderConfigurationProperties;
|
||||
this.adminUtilsOperation = adminUtilsOperation;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param metadataRetryOperations the retry configuration
|
||||
*/
|
||||
public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
|
||||
this.metadataRetryOperations = metadataRetryOperations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
if (this.metadataRetryOperations == null) {
|
||||
RetryTemplate retryTemplate = new RetryTemplate();
|
||||
|
||||
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
|
||||
simpleRetryPolicy.setMaxAttempts(10);
|
||||
retryTemplate.setRetryPolicy(simpleRetryPolicy);
|
||||
|
||||
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
|
||||
backOffPolicy.setInitialInterval(100);
|
||||
backOffPolicy.setMultiplier(2);
|
||||
backOffPolicy.setMaxInterval(1000);
|
||||
retryTemplate.setBackOffPolicy(backOffPolicy);
|
||||
this.metadataRetryOperations = retryTemplate;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProducerDestination provisionProducerDestination(final String name, ExtendedProducerProperties<KafkaProducerProperties> properties) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("Using kafka topic for outbound: " + name);
|
||||
}
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, properties.getPartitionCount());
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
|
||||
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
|
||||
this.configurationProperties.getZkSessionTimeout(),
|
||||
this.configurationProperties.getZkConnectionTimeout(),
|
||||
JaasUtils.isZkSecurityEnabled());
|
||||
int partitions = adminUtilsOperation.partitionSize(name, zkUtils);
|
||||
return new KafkaProducerDestination(name, partitions);
|
||||
}
|
||||
else {
|
||||
return new KafkaProducerDestination(name);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumerDestination provisionConsumerDestination(final String name, final String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
boolean anonymous = !StringUtils.hasText(group);
|
||||
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
|
||||
"DLQ support is not available for anonymous subscriptions");
|
||||
if (properties.getInstanceCount() == 0) {
|
||||
throw new IllegalArgumentException("Instance count cannot be zero");
|
||||
}
|
||||
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, partitionCount);
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
|
||||
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
|
||||
this.configurationProperties.getZkSessionTimeout(),
|
||||
this.configurationProperties.getZkConnectionTimeout(),
|
||||
JaasUtils.isZkSecurityEnabled());
|
||||
int partitions = adminUtilsOperation.partitionSize(name, zkUtils);
|
||||
if (properties.getExtension().isEnableDlq() && !anonymous) {
|
||||
String dlqTopic = "error." + name + "." + group;
|
||||
createTopicAndPartitions(dlqTopic, partitions);
|
||||
return new KafkaConsumerDestination(name, partitions, dlqTopic);
|
||||
}
|
||||
return new KafkaConsumerDestination(name, partitions);
|
||||
}
|
||||
return new KafkaConsumerDestination(name);
|
||||
}
|
||||
|
||||
private void createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(final String topicName, final int partitionCount) {
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
|
||||
createTopicAndPartitions(topicName, partitionCount);
|
||||
}
|
||||
else if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation == null) {
|
||||
this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. " +
|
||||
"No topic will be created by the binder");
|
||||
}
|
||||
else if (!this.configurationProperties.isAutoCreateTopics()) {
|
||||
this.logger.info("Auto creation of topics is disabled.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Kafka topic if needed, or try to increase its partition count to the
|
||||
* desired number.
|
||||
*/
|
||||
private void createTopicAndPartitions(final String topicName, final int partitionCount) {
|
||||
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
|
||||
this.configurationProperties.getZkSessionTimeout(),
|
||||
this.configurationProperties.getZkConnectionTimeout(),
|
||||
JaasUtils.isZkSecurityEnabled());
|
||||
try {
|
||||
short errorCode = adminUtilsOperation.errorCodeFromTopicMetadata(topicName, zkUtils);
|
||||
if (errorCode == ErrorMapping.NoError()) {
|
||||
// only consider minPartitionCount for resizing if autoAddPartitions is true
|
||||
int effectivePartitionCount = this.configurationProperties.isAutoAddPartitions()
|
||||
? Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount)
|
||||
: partitionCount;
|
||||
int partitionSize = adminUtilsOperation.partitionSize(topicName, zkUtils);
|
||||
|
||||
if (partitionSize < effectivePartitionCount) {
|
||||
if (this.configurationProperties.isAutoAddPartitions()) {
|
||||
adminUtilsOperation.invokeAddPartitions(zkUtils, topicName, effectivePartitionCount, null, false);
|
||||
}
|
||||
else {
|
||||
throw new ProvisioningException("The number of expected partitions was: " + partitionCount + ", but "
|
||||
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
|
||||
+ "Consider either increasing the partition count of the topic or enabling " +
|
||||
"`autoAddPartitions`");
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
|
||||
// always consider minPartitionCount for topic creation
|
||||
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
|
||||
partitionCount);
|
||||
|
||||
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
|
||||
|
||||
@Override
|
||||
public Object doWithRetry(RetryContext context) throws RuntimeException {
|
||||
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
|
||||
configurationProperties.getReplicationFactor(), new Properties());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
else {
|
||||
throw new ProvisioningException("Error fetching Kafka topic metadata: ",
|
||||
ErrorMapping.exceptionFor(errorCode));
|
||||
}
|
||||
}
|
||||
finally {
|
||||
zkUtils.close();
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<PartitionInfo> getPartitionsForTopic(final int partitionCount, final Callable<Collection<PartitionInfo>> callable) {
|
||||
try {
|
||||
return this.metadataRetryOperations
|
||||
.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() {
|
||||
|
||||
@Override
|
||||
public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
|
||||
Collection<PartitionInfo> partitions = callable.call();
|
||||
// do a sanity check on the partition set
|
||||
if (partitions.size() < partitionCount) {
|
||||
throw new IllegalStateException("The number of expected partitions was: "
|
||||
+ partitionCount + ", but " + partitions.size()
|
||||
+ (partitions.size() > 1 ? " have " : " has ") + "been found instead");
|
||||
}
|
||||
return partitions;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e) {
|
||||
this.logger.error("Cannot initialize Binder", e);
|
||||
throw new BinderException("Cannot initialize binder:", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class KafkaProducerDestination implements ProducerDestination {
|
||||
|
||||
private final String producerDestinationName;
|
||||
|
||||
private final int partitions;
|
||||
|
||||
KafkaProducerDestination(String destinationName) {
|
||||
this(destinationName, 0);
|
||||
}
|
||||
|
||||
KafkaProducerDestination(String destinationName, Integer partitions) {
|
||||
this.producerDestinationName = destinationName;
|
||||
this.partitions = partitions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return producerDestinationName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNameForPartition(int partition) {
|
||||
return producerDestinationName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "KafkaProducerDestination{" +
|
||||
"producerDestinationName='" + producerDestinationName + '\'' +
|
||||
", partitions=" + partitions +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
private static final class KafkaConsumerDestination implements ConsumerDestination {
|
||||
|
||||
private final String consumerDestinationName;
|
||||
|
||||
private final int partitions;
|
||||
|
||||
private final String dlqName;
|
||||
|
||||
KafkaConsumerDestination(String consumerDestinationName) {
|
||||
this(consumerDestinationName, 0, null);
|
||||
}
|
||||
|
||||
KafkaConsumerDestination(String consumerDestinationName, int partitions) {
|
||||
this(consumerDestinationName, partitions, null);
|
||||
}
|
||||
|
||||
KafkaConsumerDestination(String consumerDestinationName, Integer partitions, String dlqName) {
|
||||
this.consumerDestinationName = consumerDestinationName;
|
||||
this.partitions = partitions;
|
||||
this.dlqName = dlqName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return this.consumerDestinationName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "KafkaConsumerDestination{" +
|
||||
"consumerDestinationName='" + consumerDestinationName + '\'' +
|
||||
", partitions=" + partitions +
|
||||
", dlqName='" + dlqName + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
package org.springframework.cloud.stream.binder.kafka.utils;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.M1</version>
|
||||
<version>1.2.0.M2</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
|
||||
|
||||
@@ -153,6 +153,7 @@ Default: `false`.
|
||||
startOffset::
|
||||
The starting offset for new groups, or when `resetOffsets` is `true`.
|
||||
Allowed values: `earliest`, `latest`.
|
||||
If the consumer group is set explicitly for the consumer 'binding' (via `spring.cloud.stream.bindings.<channelName>.group`), then 'startOffset' is set to `earliest`; otherwise it is set to `latest` for the `anonymous` consumer group.
|
||||
+
|
||||
Default: null (equivalent to `earliest`).
|
||||
enableDlq::
|
||||
@@ -328,26 +329,18 @@ In secure environments, we strongly recommend creating topics and managing ACLs
|
||||
|
||||
==== Using the binder with Apache Kafka 0.10
|
||||
|
||||
The binder also supports connecting to Kafka 0.10 brokers.
|
||||
In order to support this, when you create the project that contains your application, include `spring-cloud-starter-stream-kafka` as you normally would do for 0.9 based applications.
|
||||
Then add these dependencies at the top of the `<dependencies>` section in the pom.xml file to override the Apache Kafka, Spring Kafka, and Spring Integration Kafka with 0.10-compatible versions as in the following example:
|
||||
The default Kafka support in Spring Cloud Stream Kafka binder is for Kafka version 0.10.1.1. The binder also supports connecting to other 0.10 based versions and 0.9 clients.
|
||||
In order to do this, when you create the project that contains your application, include `spring-cloud-starter-stream-kafka` as you normally would do for the default binder.
|
||||
Then add these dependencies at the top of the `<dependencies>` section in the pom.xml file to override the dependencies.
|
||||
|
||||
Here is an example for downgrading your application to 0.10.0.1. Since it is still on the 0.10 line, the default `spring-kafka` and `spring-integration-kafka` versions can be retained.
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>1.1.1.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>2.1.0.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>0.10.0.0</version>
|
||||
<version>0.10.0.1</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
@@ -355,6 +348,44 @@ Then add these dependencies at the top of the `<dependencies>` section in the po
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>0.10.0.1</version>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
Here is another example of using 0.9.0.1 version.
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>1.0.5.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>2.0.1.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>0.9.0.1</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>0.9.0.1</version>
|
||||
</dependency>
|
||||
|
||||
----
|
||||
|
||||
[NOTE]
|
||||
@@ -369,8 +400,8 @@ For best results, we recommend using the most recent 0.10-compatible versions of
|
||||
The Apache Kafka Binder uses the administrative utilities which are part of the Apache Kafka server library to create and reconfigure topics.
|
||||
If the inclusion of the Apache Kafka server library and its dependencies is not necessary at runtime because the application will rely on the topics being configured administratively, the Kafka binder allows for Apache Kafka server dependency to be excluded from the application.
|
||||
|
||||
If you use Kafka 10 dependencies as advised above, all you have to do is not to include the kafka broker dependency.
|
||||
If you use Kafka 0.9, then ensure that you exclude the kafka broker jar from the `spring-cloud-starter-stream-kafka` dependency as following.
|
||||
If you use non default versions for Kafka dependencies as advised above, all you have to do is not to include the kafka broker dependency.
|
||||
If you use the default Kafka version, then ensure that you exclude the kafka broker jar from the `spring-cloud-starter-stream-kafka` dependency as following.
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
|
||||
24
spring-cloud-stream-binder-kafka-test-support/pom.xml
Normal file
24
spring-cloud-stream-binder-kafka-test-support/pom.xml
Normal file
@@ -0,0 +1,24 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.M2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
|
||||
<description>Kafka related test classes</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -10,10 +10,15 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.M1</version>
|
||||
<version>1.2.0.M2</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<version>1.2.0.M2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
@@ -37,20 +42,15 @@
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>${spring-integration-kafka.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro-compiler</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>jline</groupId>
|
||||
<artifactId>jline</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
@@ -82,8 +82,38 @@
|
||||
<classifier>test</classifier>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-schema</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-avro-serializer</artifactId>
|
||||
<version>3.1.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-schema-registry</artifactId>
|
||||
<version>3.1.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>confluent</id>
|
||||
<url>http://packages.confluent.io/maven/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.env.EnvironmentPostProcessor;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.core.env.MapPropertySource;
|
||||
|
||||
/**
|
||||
* An {@link EnvironmentPostProcessor} that sets some common configuration properties (log config etc.,) for Kafka
|
||||
* binder.
|
||||
*
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class KafkaBinderEnvironmentPostProcessor implements EnvironmentPostProcessor {
|
||||
|
||||
@Override
|
||||
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
|
||||
Map<String, Object> propertiesToAdd = new HashMap<>();
|
||||
propertiesToAdd.put("logging.pattern.console", "%d{ISO8601} %5p %t %c{2}:%L - %m%n");
|
||||
propertiesToAdd.put("logging.level.org.I0Itec.zkclient", "ERROR");
|
||||
propertiesToAdd.put("logging.level.kafka.server.KafkaConfig", "ERROR");
|
||||
propertiesToAdd.put("logging.level.kafka.admin.AdminClient.AdminConfig", "ERROR");
|
||||
environment.getPropertySources().addLast(new MapPropertySource("kafkaBinderLogConfig", propertiesToAdd));
|
||||
}
|
||||
}
|
||||
@@ -29,7 +29,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
import org.springframework.boot.actuate.health.HealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
|
||||
/**
|
||||
* Health indicator for Kafka.
|
||||
|
||||
@@ -28,7 +28,7 @@ import org.apache.kafka.common.security.JaasUtils;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
|
||||
@@ -22,34 +22,30 @@ import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import kafka.common.ErrorMapping;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.producer.Callback;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.security.JaasUtils;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.BinderException;
|
||||
import org.springframework.cloud.stream.binder.BinderHeaders;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.context.Lifecycle;
|
||||
import org.springframework.expression.common.LiteralExpression;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
@@ -65,19 +61,17 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ErrorHandler;
|
||||
import org.springframework.kafka.listener.config.ContainerProperties;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.kafka.support.TopicPartitionInitialOffset;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.retry.RetryCallback;
|
||||
import org.springframework.retry.RetryContext;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
|
||||
/**
|
||||
* A {@link Binder} that uses Kafka as the underlying middleware.
|
||||
@@ -92,26 +86,20 @@ import org.springframework.util.StringUtils;
|
||||
*/
|
||||
public class KafkaMessageChannelBinder extends
|
||||
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>, Collection<PartitionInfo>, String>
|
||||
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties>,
|
||||
DisposableBean {
|
||||
ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
|
||||
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
|
||||
|
||||
private final KafkaBinderConfigurationProperties configurationProperties;
|
||||
|
||||
private RetryOperations metadataRetryOperations;
|
||||
|
||||
private final Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<>();
|
||||
|
||||
private ProducerListener<byte[], byte[]> producerListener;
|
||||
|
||||
private volatile Producer<byte[], byte[]> dlqProducer;
|
||||
|
||||
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
|
||||
|
||||
private AdminUtilsOperation adminUtilsOperation;
|
||||
private final Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<>();
|
||||
|
||||
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties) {
|
||||
super(false, headersToMap(configurationProperties));
|
||||
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
|
||||
KafkaTopicProvisioner provisioningProvider) {
|
||||
super(false, headersToMap(configurationProperties), provisioningProvider);
|
||||
this.configurationProperties = configurationProperties;
|
||||
}
|
||||
|
||||
@@ -131,50 +119,10 @@ public class KafkaMessageChannelBinder extends
|
||||
return headersToMap;
|
||||
}
|
||||
|
||||
public void setAdminUtilsOperation(AdminUtilsOperation adminUtilsOperation) {
|
||||
this.adminUtilsOperation = adminUtilsOperation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry configuration for operations such as validating topic creation
|
||||
*
|
||||
* @param metadataRetryOperations the retry configuration
|
||||
*/
|
||||
public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
|
||||
this.metadataRetryOperations = metadataRetryOperations;
|
||||
}
|
||||
|
||||
public void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) {
|
||||
this.extendedBindingProperties = extendedBindingProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInit() throws Exception {
|
||||
|
||||
if (this.metadataRetryOperations == null) {
|
||||
RetryTemplate retryTemplate = new RetryTemplate();
|
||||
|
||||
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
|
||||
simpleRetryPolicy.setMaxAttempts(10);
|
||||
retryTemplate.setRetryPolicy(simpleRetryPolicy);
|
||||
|
||||
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
|
||||
backOffPolicy.setInitialInterval(100);
|
||||
backOffPolicy.setMultiplier(2);
|
||||
backOffPolicy.setMaxInterval(1000);
|
||||
retryTemplate.setBackOffPolicy(backOffPolicy);
|
||||
this.metadataRetryOperations = retryTemplate;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
if (this.dlqProducer != null) {
|
||||
this.dlqProducer.close();
|
||||
this.dlqProducer = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void setProducerListener(ProducerListener<byte[], byte[]> producerListener) {
|
||||
this.producerListener = producerListener;
|
||||
}
|
||||
@@ -194,49 +142,30 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageHandler createProducerMessageHandler(final String destination,
|
||||
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
|
||||
|
||||
KafkaTopicUtils.validateTopicName(destination);
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(destination, producerProperties.getPartitionCount());
|
||||
Collection<PartitionInfo> partitions = getPartitionsForTopic(destination, producerProperties.getPartitionCount());
|
||||
|
||||
final DefaultKafkaProducerFactory<byte[], byte[]> producerFB = getProducerFactory(producerProperties);
|
||||
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(producerProperties.getPartitionCount(),
|
||||
new Callable<Collection<PartitionInfo>>() {
|
||||
@Override
|
||||
public Collection<PartitionInfo> call() throws Exception {
|
||||
return producerFB.createProducer().partitionsFor(destination.getName());
|
||||
}
|
||||
});
|
||||
this.topicsInUse.put(destination.getName(), partitions);
|
||||
if (producerProperties.getPartitionCount() < partitions.size()) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("The `partitionCount` of the producer for topic " + destination + " is "
|
||||
this.logger.info("The `partitionCount` of the producer for topic " + destination.getName() + " is "
|
||||
+ producerProperties.getPartitionCount() + ", smaller than the actual partition count of "
|
||||
+ partitions.size() + " of the topic. The larger number will be used instead.");
|
||||
}
|
||||
}
|
||||
|
||||
this.topicsInUse.put(destination, partitions);
|
||||
|
||||
DefaultKafkaProducerFactory<byte[], byte[]> producerFB = getProducerFactory(producerProperties);
|
||||
KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFB);
|
||||
if (this.producerListener != null) {
|
||||
kafkaTemplate.setProducerListener(this.producerListener);
|
||||
}
|
||||
return new ProducerConfigurationMessageHandler(kafkaTemplate, destination, producerProperties, producerFB);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String createProducerDestinationIfNecessary(String name,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("Using kafka topic for outbound: " + name);
|
||||
}
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, properties.getPartitionCount());
|
||||
Collection<PartitionInfo> partitions = getPartitionsForTopic(name, properties.getPartitionCount());
|
||||
if (properties.getPartitionCount() < partitions.size()) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("The `partitionCount` of the producer for topic " + name + " is "
|
||||
+ properties.getPartitionCount() + ", smaller than the actual partition count of "
|
||||
+ partitions.size() + " of the topic. The larger number will be used instead.");
|
||||
}
|
||||
}
|
||||
this.topicsInUse.put(name, partitions);
|
||||
return name;
|
||||
return new ProducerConfigurationMessageHandler(kafkaTemplate, destination.getName(), producerProperties, producerFB);
|
||||
}
|
||||
|
||||
private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
|
||||
@@ -263,15 +192,28 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<PartitionInfo> createConsumerDestinationIfNecessary(String name, String group,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
if (properties.getInstanceCount() == 0) {
|
||||
throw new IllegalArgumentException("Instance count cannot be zero");
|
||||
@SuppressWarnings("unchecked")
|
||||
protected MessageProducer createConsumerEndpoint(final ConsumerDestination destination, final String group,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
|
||||
boolean anonymous = !StringUtils.hasText(group);
|
||||
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
|
||||
"DLQ support is not available for anonymous subscriptions");
|
||||
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
|
||||
Map<String, Object> props = getConsumerConfig(anonymous, consumerGroup);
|
||||
if (!ObjectUtils.isEmpty(properties.getExtension().getConfiguration())) {
|
||||
props.putAll(properties.getExtension().getConfiguration());
|
||||
}
|
||||
final ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
|
||||
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, partitionCount);
|
||||
Collection<PartitionInfo> allPartitions = getPartitionsForTopic(name, partitionCount);
|
||||
|
||||
Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
|
||||
new Callable<Collection<PartitionInfo>>() {
|
||||
@Override
|
||||
public Collection<PartitionInfo> call() throws Exception {
|
||||
return consumerFactory.createConsumer().partitionsFor(destination.getName());
|
||||
}
|
||||
});
|
||||
|
||||
Collection<PartitionInfo> listenedPartitions;
|
||||
|
||||
@@ -288,29 +230,13 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
}
|
||||
}
|
||||
this.topicsInUse.put(name, listenedPartitions);
|
||||
return listenedPartitions;
|
||||
}
|
||||
this.topicsInUse.put(destination.getName(), listenedPartitions);
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected MessageProducer createConsumerEndpoint(String name, String group, Collection<PartitionInfo> destination,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
boolean anonymous = !StringUtils.hasText(group);
|
||||
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
|
||||
"DLQ support is not available for anonymous subscriptions");
|
||||
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
|
||||
Map<String, Object> props = getConsumerConfig(anonymous, consumerGroup);
|
||||
if (!ObjectUtils.isEmpty(properties.getExtension().getConfiguration())) {
|
||||
props.putAll(properties.getExtension().getConfiguration());
|
||||
}
|
||||
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
|
||||
Collection<PartitionInfo> listenedPartitions = destination;
|
||||
Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
|
||||
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
|
||||
listenedPartitions);
|
||||
final ContainerProperties containerProperties =
|
||||
anonymous || properties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(name)
|
||||
anonymous || properties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(destination.getName())
|
||||
: new ContainerProperties(topicPartitionInitialOffsets);
|
||||
int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
|
||||
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
|
||||
@@ -342,8 +268,8 @@ public class KafkaMessageChannelBinder extends
|
||||
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
|
||||
kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);
|
||||
if (properties.getExtension().isEnableDlq()) {
|
||||
final String dlqTopic = "error." + name + "." + group;
|
||||
initDlqProducer();
|
||||
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(new ExtendedProducerProperties<>(new KafkaProducerProperties()));
|
||||
final KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
|
||||
messageListenerContainer.getContainerProperties().setErrorHandler(new ErrorHandler() {
|
||||
|
||||
@Override
|
||||
@@ -352,29 +278,30 @@ public class KafkaMessageChannelBinder extends
|
||||
: null;
|
||||
final byte[] payload = message.value() != null
|
||||
? Utils.toArray(ByteBuffer.wrap((byte[]) message.value())) : null;
|
||||
KafkaMessageChannelBinder.this.dlqProducer.send(new ProducerRecord<>(dlqTopic, key, payload),
|
||||
new Callback() {
|
||||
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send("error." + destination.getName() + "." + group,
|
||||
message.partition(), key, payload);
|
||||
sentDlq.addCallback(new ListenableFutureCallback<SendResult<byte[], byte[]>>() {
|
||||
StringBuilder sb = new StringBuilder().append(" a message with key='")
|
||||
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'")
|
||||
.append(" and payload='")
|
||||
.append(toDisplayString(ObjectUtils.nullSafeToString(payload), 50))
|
||||
.append("'").append(" received from ")
|
||||
.append(message.partition());
|
||||
|
||||
@Override
|
||||
public void onCompletion(RecordMetadata metadata, Exception exception) {
|
||||
StringBuffer messageLog = new StringBuffer();
|
||||
messageLog.append(" a message with key='"
|
||||
+ toDisplayString(ObjectUtils.nullSafeToString(key), 50) + "'");
|
||||
messageLog.append(" and payload='"
|
||||
+ toDisplayString(ObjectUtils.nullSafeToString(payload), 50) + "'");
|
||||
messageLog.append(" received from " + message.partition());
|
||||
if (exception != null) {
|
||||
KafkaMessageChannelBinder.this.logger.error(
|
||||
"Error sending to DLQ" + messageLog.toString(), exception);
|
||||
}
|
||||
else {
|
||||
if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
|
||||
KafkaMessageChannelBinder.this.logger.debug(
|
||||
"Sent to DLQ " + messageLog.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void onFailure(Throwable ex) {
|
||||
KafkaMessageChannelBinder.this.logger.error(
|
||||
"Error sending to DLQ" + sb.toString(), ex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(SendResult<byte[], byte[]> result) {
|
||||
if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
|
||||
KafkaMessageChannelBinder.this.logger.debug(
|
||||
"Sent to DLQ " + sb.toString());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -416,132 +343,6 @@ public class KafkaMessageChannelBinder extends
|
||||
return topicPartitionInitialOffsets;
|
||||
}
|
||||
|
||||
private void createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(final String topicName, final int partitionCount) {
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
|
||||
createTopicAndPartitions(topicName, partitionCount);
|
||||
}
|
||||
else if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation == null) {
|
||||
this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. " +
|
||||
"No topic will be created by the binder");
|
||||
}
|
||||
else if (!this.configurationProperties.isAutoCreateTopics()) {
|
||||
this.logger.info("Auto creation of topics is disabled.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Kafka topic if needed, or try to increase its partition count to the
|
||||
* desired number.
|
||||
*/
|
||||
private void createTopicAndPartitions(final String topicName, final int partitionCount) {
|
||||
|
||||
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
|
||||
this.configurationProperties.getZkSessionTimeout(),
|
||||
this.configurationProperties.getZkConnectionTimeout(),
|
||||
JaasUtils.isZkSecurityEnabled());
|
||||
try {
|
||||
short errorCode = adminUtilsOperation.errorCodeFromTopicMetadata(topicName, zkUtils);
|
||||
if (errorCode == ErrorMapping.NoError()) {
|
||||
// only consider minPartitionCount for resizing if autoAddPartitions is true
|
||||
int effectivePartitionCount = this.configurationProperties.isAutoAddPartitions()
|
||||
? Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount)
|
||||
: partitionCount;
|
||||
int partitionSize = adminUtilsOperation.partitionSize(topicName, zkUtils);
|
||||
|
||||
if (partitionSize < effectivePartitionCount) {
|
||||
if (this.configurationProperties.isAutoAddPartitions()) {
|
||||
adminUtilsOperation.invokeAddPartitions(zkUtils, topicName, effectivePartitionCount, null, false);
|
||||
}
|
||||
else {
|
||||
throw new BinderException("The number of expected partitions was: " + partitionCount + ", but "
|
||||
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
|
||||
+ "Consider either increasing the partition count of the topic or enabling " +
|
||||
"`autoAddPartitions`");
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
|
||||
// always consider minPartitionCount for topic creation
|
||||
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
|
||||
partitionCount);
|
||||
|
||||
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
|
||||
|
||||
@Override
|
||||
public Object doWithRetry(RetryContext context) throws RuntimeException {
|
||||
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
|
||||
configurationProperties.getReplicationFactor(), new Properties());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
else {
|
||||
throw new BinderException("Error fetching Kafka topic metadata: ",
|
||||
ErrorMapping.exceptionFor(errorCode));
|
||||
}
|
||||
}
|
||||
finally {
|
||||
zkUtils.close();
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<PartitionInfo> getPartitionsForTopic(final String topicName, final int partitionCount) {
|
||||
try {
|
||||
return this.metadataRetryOperations
|
||||
.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() {
|
||||
|
||||
@Override
|
||||
public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
|
||||
Collection<PartitionInfo> partitions =
|
||||
getProducerFactory(
|
||||
new ExtendedProducerProperties<>(new KafkaProducerProperties()))
|
||||
.createProducer().partitionsFor(topicName);
|
||||
|
||||
// do a sanity check on the partition set
|
||||
if (partitions.size() < partitionCount) {
|
||||
throw new IllegalStateException("The number of expected partitions was: "
|
||||
+ partitionCount + ", but " + partitions.size()
|
||||
+ (partitions.size() > 1 ? " have " : " has ") + "been found instead");
|
||||
}
|
||||
return partitions;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e) {
|
||||
this.logger.error("Cannot initialize Binder", e);
|
||||
throw new BinderException("Cannot initialize binder:", e);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void initDlqProducer() {
|
||||
try {
|
||||
if (this.dlqProducer == null) {
|
||||
synchronized (this) {
|
||||
if (this.dlqProducer == null) {
|
||||
// we can use the producer defaults as we do not need to tune
|
||||
// performance
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
this.configurationProperties.getKafkaConnectionString());
|
||||
props.put(ProducerConfig.RETRIES_CONFIG, 0);
|
||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
|
||||
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
|
||||
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory =
|
||||
new DefaultKafkaProducerFactory<>(props);
|
||||
this.dlqProducer = defaultKafkaProducerFactory.createProducer();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException("Cannot initialize DLQ producer:", e);
|
||||
}
|
||||
}
|
||||
|
||||
private String toDisplayString(String original, int maxCharacters) {
|
||||
if (original.length() <= maxCharacters) {
|
||||
return original;
|
||||
@@ -564,7 +365,7 @@ public class KafkaMessageChannelBinder extends
|
||||
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
|
||||
if (producerProperties.isPartitioned()) {
|
||||
SpelExpressionParser parser = new SpelExpressionParser();
|
||||
setPartitionIdExpression(parser.parseExpression("headers.partition"));
|
||||
setPartitionIdExpression(parser.parseExpression("headers." + BinderHeaders.PARTITION_HEADER));
|
||||
}
|
||||
if (producerProperties.getExtension().isSync()) {
|
||||
setSync(true);
|
||||
|
||||
@@ -30,11 +30,14 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderJaasInitializerListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.config.codec.kryo.KryoCodecAutoConfiguration;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
@@ -82,14 +85,18 @@ public class KafkaBinderConfiguration {
|
||||
@Autowired (required = false)
|
||||
private AdminUtilsOperation adminUtilsOperation;
|
||||
|
||||
@Bean
|
||||
KafkaTopicProvisioner provisioningProvider() {
|
||||
return new KafkaTopicProvisioner(this.configurationProperties, this.adminUtilsOperation);
|
||||
}
|
||||
|
||||
@Bean
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder() {
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
|
||||
this.configurationProperties);
|
||||
this.configurationProperties, provisioningProvider());
|
||||
kafkaMessageChannelBinder.setCodec(this.codec);
|
||||
kafkaMessageChannelBinder.setProducerListener(producerListener);
|
||||
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
|
||||
kafkaMessageChannelBinder.setAdminUtilsOperation(adminUtilsOperation);
|
||||
return kafkaMessageChannelBinder;
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
org.springframework.boot.env.EnvironmentPostProcessor=\
|
||||
org.springframework.cloud.stream.binder.kafka.KafkaBinderEnvironmentPostProcessor
|
||||
@@ -23,6 +23,8 @@ import com.esotericsoftware.kryo.Registration;
|
||||
import org.springframework.cloud.stream.binder.AbstractTestBinder;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.integration.codec.Codec;
|
||||
import org.springframework.integration.codec.kryo.KryoRegistrar;
|
||||
import org.springframework.integration.codec.kryo.PojoCodec;
|
||||
|
||||
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
@@ -42,7 +43,9 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.Spy;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
@@ -54,7 +57,6 @@ import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
@@ -111,12 +113,6 @@ public class Kafka10BinderTests extends KafkaBinderTests {
|
||||
return consumerFactory().createConsumer().partitionsFor(topic).size();
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void setMetadataRetryOperations(Binder binder, RetryOperations retryOperations) {
|
||||
((Kafka10TestBinder) binder).getBinder().setMetadataRetryOperations(retryOperations);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
|
||||
@@ -237,8 +233,8 @@ public class Kafka10BinderTests extends KafkaBinderTests {
|
||||
assertThat(inbound).isNotNull();
|
||||
assertTrue(message.getPayload() instanceof User1);
|
||||
User1 receivedUser = (User1) message.getPayload();
|
||||
assertThat(receivedUser.getName()).isEqualTo(userName1);
|
||||
assertThat(receivedUser.getFavoriteColor()).isEqualTo(favColor1);
|
||||
Assertions.assertThat(receivedUser.getName()).isEqualTo(userName1);
|
||||
Assertions.assertThat(receivedUser.getFavoriteColor()).isEqualTo(favColor1);
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
@@ -16,8 +16,10 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.kafka.support.LoggingProducerListener;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
@@ -34,14 +36,19 @@ public class Kafka10TestBinder extends AbstractKafkaTestBinder {
|
||||
|
||||
public Kafka10TestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
|
||||
try {
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration);
|
||||
AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
|
||||
KafkaTopicProvisioner provisioningProvider =
|
||||
new KafkaTopicProvisioner(binderConfiguration, adminUtilsOperation);
|
||||
provisioningProvider.afterPropertiesSet();
|
||||
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration, provisioningProvider);
|
||||
|
||||
binder.setCodec(getCodec());
|
||||
ProducerListener producerListener = new LoggingProducerListener();
|
||||
binder.setProducerListener(producerListener);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.setAdminUtilsOperation(new Kafka10AdminUtilsOperation());
|
||||
binder.afterPropertiesSet();
|
||||
this.setBinder(binder);
|
||||
}
|
||||
@@ -38,16 +38,20 @@ import org.junit.Test;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.BinderException;
|
||||
import org.springframework.cloud.stream.binder.Binding;
|
||||
import org.springframework.cloud.stream.binder.DefaultBinding;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.HeaderMode;
|
||||
import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
|
||||
import org.springframework.cloud.stream.binder.PartitionTestSupport;
|
||||
import org.springframework.cloud.stream.binder.TestUtils;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
|
||||
import org.springframework.cloud.stream.config.BindingProperties;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningException;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.integration.IntegrationMessageHeaderAccessor;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
@@ -66,7 +70,6 @@ import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
import org.springframework.retry.backoff.FixedBackOffPolicy;
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
@@ -107,8 +110,6 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
|
||||
protected abstract int partitionSize(String topic);
|
||||
|
||||
protected abstract void setMetadataRetryOperations(Binder binder, RetryOperations retryOperations);
|
||||
|
||||
protected abstract ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties);
|
||||
|
||||
protected abstract void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
|
||||
@@ -284,7 +285,6 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
String testTopicName = "nonexisting" + System.currentTimeMillis();
|
||||
@@ -1013,7 +1013,6 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
String testTopicName = "createdByBroker-" + System.currentTimeMillis();
|
||||
@@ -1069,7 +1068,6 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
Binding<?> binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
@@ -1106,7 +1104,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e).isInstanceOf(BinderException.class);
|
||||
assertThat(e).isInstanceOf(ProvisioningException.class);
|
||||
assertThat(e)
|
||||
.hasMessageContaining("The number of expected partitions was: 3, but 1 has been found instead");
|
||||
}
|
||||
@@ -1139,7 +1137,6 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
// this consumer must consume from partition 2
|
||||
@@ -1188,7 +1185,6 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
Binding<?> binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
@@ -1350,6 +1346,221 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testPartitionedModuleJavaWithRawMode() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
|
||||
properties.setHeaderMode(HeaderMode.raw);
|
||||
properties.setPartitionKeyExtractorClass(RawKafkaPartitionTestSupport.class);
|
||||
properties.setPartitionSelectorClass(RawKafkaPartitionTestSupport.class);
|
||||
properties.setPartitionCount(6);
|
||||
|
||||
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(properties));
|
||||
output.setBeanName("test.output");
|
||||
Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output, properties);
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setConcurrency(2);
|
||||
consumerProperties.setInstanceCount(3);
|
||||
consumerProperties.setInstanceIndex(0);
|
||||
consumerProperties.setPartitioned(true);
|
||||
consumerProperties.setHeaderMode(HeaderMode.raw);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
QueueChannel input0 = new QueueChannel();
|
||||
input0.setBeanName("test.input0J");
|
||||
Binding<MessageChannel> input0Binding = binder.bindConsumer("partJ.0", "test", input0, consumerProperties);
|
||||
consumerProperties.setInstanceIndex(1);
|
||||
QueueChannel input1 = new QueueChannel();
|
||||
input1.setBeanName("test.input1J");
|
||||
Binding<MessageChannel> input1Binding = binder.bindConsumer("partJ.0", "test", input1, consumerProperties);
|
||||
consumerProperties.setInstanceIndex(2);
|
||||
QueueChannel input2 = new QueueChannel();
|
||||
input2.setBeanName("test.input2J");
|
||||
Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.0", "test", input2, consumerProperties);
|
||||
|
||||
output.send(new GenericMessage<>(new byte[] {(byte) 0}));
|
||||
output.send(new GenericMessage<>(new byte[] {(byte) 1}));
|
||||
output.send(new GenericMessage<>(new byte[] {(byte) 2}));
|
||||
|
||||
Message<?> receive0 = receive(input0);
|
||||
assertThat(receive0).isNotNull();
|
||||
Message<?> receive1 = receive(input1);
|
||||
assertThat(receive1).isNotNull();
|
||||
Message<?> receive2 = receive(input2);
|
||||
assertThat(receive2).isNotNull();
|
||||
|
||||
assertThat(Arrays.asList(((byte[]) receive0.getPayload())[0], ((byte[]) receive1.getPayload())[0],
|
||||
((byte[]) receive2.getPayload())[0])).containsExactlyInAnyOrder((byte) 0, (byte) 1, (byte) 2);
|
||||
|
||||
input0Binding.unbind();
|
||||
input1Binding.unbind();
|
||||
input2Binding.unbind();
|
||||
outputBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testPartitionedModuleSpELWithRawMode() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
|
||||
properties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload[0]"));
|
||||
properties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
|
||||
properties.setPartitionCount(6);
|
||||
properties.setHeaderMode(HeaderMode.raw);
|
||||
|
||||
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(properties));
|
||||
output.setBeanName("test.output");
|
||||
Binding<MessageChannel> outputBinding = binder.bindProducer("part.0", output, properties);
|
||||
try {
|
||||
Object endpoint = extractEndpoint(outputBinding);
|
||||
assertThat(getEndpointRouting(endpoint))
|
||||
.contains(getExpectedRoutingBaseDestination("part.0", "test") + "-' + headers['partition']");
|
||||
}
|
||||
catch (UnsupportedOperationException ignored) {
|
||||
}
|
||||
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setConcurrency(2);
|
||||
consumerProperties.setInstanceIndex(0);
|
||||
consumerProperties.setInstanceCount(3);
|
||||
consumerProperties.setPartitioned(true);
|
||||
consumerProperties.setHeaderMode(HeaderMode.raw);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
QueueChannel input0 = new QueueChannel();
|
||||
input0.setBeanName("test.input0S");
|
||||
Binding<MessageChannel> input0Binding = binder.bindConsumer("part.0", "test", input0, consumerProperties);
|
||||
consumerProperties.setInstanceIndex(1);
|
||||
QueueChannel input1 = new QueueChannel();
|
||||
input1.setBeanName("test.input1S");
|
||||
Binding<MessageChannel> input1Binding = binder.bindConsumer("part.0", "test", input1, consumerProperties);
|
||||
consumerProperties.setInstanceIndex(2);
|
||||
QueueChannel input2 = new QueueChannel();
|
||||
input2.setBeanName("test.input2S");
|
||||
Binding<MessageChannel> input2Binding = binder.bindConsumer("part.0", "test", input2, consumerProperties);
|
||||
|
||||
Message<byte[]> message2 = org.springframework.integration.support.MessageBuilder.withPayload(new byte[] {2})
|
||||
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "kafkaBinderTestCommonsDelegate")
|
||||
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 42)
|
||||
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 43).build();
|
||||
output.send(message2);
|
||||
output.send(new GenericMessage<>(new byte[] {1}));
|
||||
output.send(new GenericMessage<>(new byte[] {0}));
|
||||
Message<?> receive0 = receive(input0);
|
||||
assertThat(receive0).isNotNull();
|
||||
Message<?> receive1 = receive(input1);
|
||||
assertThat(receive1).isNotNull();
|
||||
Message<?> receive2 = receive(input2);
|
||||
assertThat(receive2).isNotNull();
|
||||
assertThat(Arrays.asList(((byte[]) receive0.getPayload())[0], ((byte[]) receive1.getPayload())[0],
|
||||
((byte[]) receive2.getPayload())[0])).containsExactlyInAnyOrder((byte) 0, (byte) 1, (byte) 2);
|
||||
input0Binding.unbind();
|
||||
input1Binding.unbind();
|
||||
input2Binding.unbind();
|
||||
outputBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSendAndReceiveWithRawMode() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
DirectChannel moduleOutputChannel = new DirectChannel();
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.setHeaderMode(HeaderMode.raw);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setHeaderMode(HeaderMode.raw);
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("0", "test", moduleInputChannel,
|
||||
consumerProperties);
|
||||
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("kafkaBinderTestCommonsDelegate".getBytes()).build();
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("kafkaBinderTestCommonsDelegate");
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSendAndReceiveWithExplicitConsumerGroupWithRawMode() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
DirectChannel moduleOutputChannel = new DirectChannel();
|
||||
// Test pub/sub by emulating how StreamPlugin handles taps
|
||||
QueueChannel module1InputChannel = new QueueChannel();
|
||||
QueueChannel module2InputChannel = new QueueChannel();
|
||||
QueueChannel module3InputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.setHeaderMode(HeaderMode.raw);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("baz.0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setHeaderMode(HeaderMode.raw);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
Binding<MessageChannel> input1Binding = binder.bindConsumer("baz.0", "test", module1InputChannel,
|
||||
consumerProperties);
|
||||
// A new module is using the tap as an input channel
|
||||
String fooTapName = "baz.0";
|
||||
Binding<MessageChannel> input2Binding = binder.bindConsumer(fooTapName, "tap1", module2InputChannel,
|
||||
consumerProperties);
|
||||
// Another new module is using tap as an input channel
|
||||
String barTapName = "baz.0";
|
||||
Binding<MessageChannel> input3Binding = binder.bindConsumer(barTapName, "tap2", module3InputChannel,
|
||||
consumerProperties);
|
||||
|
||||
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("kafkaBinderTestCommonsDelegate".getBytes()).build();
|
||||
boolean success = false;
|
||||
boolean retried = false;
|
||||
while (!success) {
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(module1InputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("kafkaBinderTestCommonsDelegate");
|
||||
|
||||
Message<?> tapped1 = receive(module2InputChannel);
|
||||
Message<?> tapped2 = receive(module3InputChannel);
|
||||
if (tapped1 == null || tapped2 == null) {
|
||||
// listener may not have started
|
||||
assertThat(retried).isFalse().withFailMessage("Failed to receive tap after retry");
|
||||
retried = true;
|
||||
continue;
|
||||
}
|
||||
success = true;
|
||||
assertThat(new String((byte[]) tapped1.getPayload())).isEqualTo("kafkaBinderTestCommonsDelegate");
|
||||
assertThat(new String((byte[]) tapped2.getPayload())).isEqualTo("kafkaBinderTestCommonsDelegate");
|
||||
}
|
||||
// delete one tap stream is deleted
|
||||
input3Binding.unbind();
|
||||
Message<?> message2 = org.springframework.integration.support.MessageBuilder.withPayload("bar".getBytes()).build();
|
||||
moduleOutputChannel.send(message2);
|
||||
|
||||
// other tap still receives messages
|
||||
Message<?> tapped = receive(module2InputChannel);
|
||||
assertThat(tapped).isNotNull();
|
||||
|
||||
// removed tap does not
|
||||
assertThat(receive(module3InputChannel)).isNull();
|
||||
|
||||
// re-subscribed tap does receive the message
|
||||
input3Binding = binder.bindConsumer(barTapName, "tap2", module3InputChannel, createConsumerProperties());
|
||||
assertThat(receive(module3InputChannel)).isNotNull();
|
||||
|
||||
// clean up
|
||||
input1Binding.unbind();
|
||||
input2Binding.unbind();
|
||||
input3Binding.unbind();
|
||||
producerBinding.unbind();
|
||||
assertThat(extractEndpoint(input1Binding).isRunning()).isFalse();
|
||||
assertThat(extractEndpoint(input2Binding).isRunning()).isFalse();
|
||||
assertThat(extractEndpoint(input3Binding).isRunning()).isFalse();
|
||||
assertThat(extractEndpoint(producerBinding).isRunning()).isFalse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
Thread.sleep(500);
|
||||
|
||||
@@ -1,258 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.cloud.stream.binder.Binding;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.HeaderMode;
|
||||
import org.springframework.integration.IntegrationMessageHeaderAccessor;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
* @author David Turanski
|
||||
* @author Gary Russell
|
||||
* @author Mark Fisher
|
||||
*/
|
||||
public class RawModeKafka09BinderTests extends Kafka09BinderTests {
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testPartitionedModuleJava() throws Exception {
|
||||
Kafka09TestBinder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
|
||||
properties.setHeaderMode(HeaderMode.raw);
|
||||
properties.setPartitionKeyExtractorClass(RawKafkaPartitionTestSupport.class);
|
||||
properties.setPartitionSelectorClass(RawKafkaPartitionTestSupport.class);
|
||||
properties.setPartitionCount(6);
|
||||
|
||||
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(properties));
|
||||
output.setBeanName("test.output");
|
||||
Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output, properties);
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setConcurrency(2);
|
||||
consumerProperties.setInstanceCount(3);
|
||||
consumerProperties.setInstanceIndex(0);
|
||||
consumerProperties.setPartitioned(true);
|
||||
consumerProperties.setHeaderMode(HeaderMode.raw);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
QueueChannel input0 = new QueueChannel();
|
||||
input0.setBeanName("test.input0J");
|
||||
Binding<MessageChannel> input0Binding = binder.bindConsumer("partJ.0", "test", input0, consumerProperties);
|
||||
consumerProperties.setInstanceIndex(1);
|
||||
QueueChannel input1 = new QueueChannel();
|
||||
input1.setBeanName("test.input1J");
|
||||
Binding<MessageChannel> input1Binding = binder.bindConsumer("partJ.0", "test", input1, consumerProperties);
|
||||
consumerProperties.setInstanceIndex(2);
|
||||
QueueChannel input2 = new QueueChannel();
|
||||
input2.setBeanName("test.input2J");
|
||||
Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.0", "test", input2, consumerProperties);
|
||||
|
||||
output.send(new GenericMessage<>(new byte[] {(byte) 0}));
|
||||
output.send(new GenericMessage<>(new byte[] {(byte) 1}));
|
||||
output.send(new GenericMessage<>(new byte[] {(byte) 2}));
|
||||
|
||||
Message<?> receive0 = receive(input0);
|
||||
assertThat(receive0).isNotNull();
|
||||
Message<?> receive1 = receive(input1);
|
||||
assertThat(receive1).isNotNull();
|
||||
Message<?> receive2 = receive(input2);
|
||||
assertThat(receive2).isNotNull();
|
||||
|
||||
assertThat(Arrays.asList(((byte[]) receive0.getPayload())[0], ((byte[]) receive1.getPayload())[0],
|
||||
((byte[]) receive2.getPayload())[0])).containsExactlyInAnyOrder((byte) 0, (byte) 1, (byte) 2);
|
||||
|
||||
input0Binding.unbind();
|
||||
input1Binding.unbind();
|
||||
input2Binding.unbind();
|
||||
outputBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testPartitionedModuleSpEL() throws Exception {
|
||||
Kafka09TestBinder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
|
||||
properties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload[0]"));
|
||||
properties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
|
||||
properties.setPartitionCount(6);
|
||||
properties.setHeaderMode(HeaderMode.raw);
|
||||
|
||||
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(properties));
|
||||
output.setBeanName("test.output");
|
||||
Binding<MessageChannel> outputBinding = binder.bindProducer("part.0", output, properties);
|
||||
try {
|
||||
Object endpoint = extractEndpoint(outputBinding);
|
||||
assertThat(getEndpointRouting(endpoint))
|
||||
.contains(getExpectedRoutingBaseDestination("part.0", "test") + "-' + headers['partition']");
|
||||
}
|
||||
catch (UnsupportedOperationException ignored) {
|
||||
}
|
||||
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setConcurrency(2);
|
||||
consumerProperties.setInstanceIndex(0);
|
||||
consumerProperties.setInstanceCount(3);
|
||||
consumerProperties.setPartitioned(true);
|
||||
consumerProperties.setHeaderMode(HeaderMode.raw);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
QueueChannel input0 = new QueueChannel();
|
||||
input0.setBeanName("test.input0S");
|
||||
Binding<MessageChannel> input0Binding = binder.bindConsumer("part.0", "test", input0, consumerProperties);
|
||||
consumerProperties.setInstanceIndex(1);
|
||||
QueueChannel input1 = new QueueChannel();
|
||||
input1.setBeanName("test.input1S");
|
||||
Binding<MessageChannel> input1Binding = binder.bindConsumer("part.0", "test", input1, consumerProperties);
|
||||
consumerProperties.setInstanceIndex(2);
|
||||
QueueChannel input2 = new QueueChannel();
|
||||
input2.setBeanName("test.input2S");
|
||||
Binding<MessageChannel> input2Binding = binder.bindConsumer("part.0", "test", input2, consumerProperties);
|
||||
|
||||
Message<byte[]> message2 = MessageBuilder.withPayload(new byte[] {2})
|
||||
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "foo")
|
||||
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 42)
|
||||
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 43).build();
|
||||
output.send(message2);
|
||||
output.send(new GenericMessage<>(new byte[] {1}));
|
||||
output.send(new GenericMessage<>(new byte[] {0}));
|
||||
Message<?> receive0 = receive(input0);
|
||||
assertThat(receive0).isNotNull();
|
||||
Message<?> receive1 = receive(input1);
|
||||
assertThat(receive1).isNotNull();
|
||||
Message<?> receive2 = receive(input2);
|
||||
assertThat(receive2).isNotNull();
|
||||
assertThat(Arrays.asList(((byte[]) receive0.getPayload())[0], ((byte[]) receive1.getPayload())[0],
|
||||
((byte[]) receive2.getPayload())[0])).containsExactlyInAnyOrder((byte) 0, (byte) 1, (byte) 2);
|
||||
input0Binding.unbind();
|
||||
input1Binding.unbind();
|
||||
input2Binding.unbind();
|
||||
outputBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testSendAndReceive() throws Exception {
|
||||
Kafka09TestBinder binder = getBinder();
|
||||
DirectChannel moduleOutputChannel = new DirectChannel();
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.setHeaderMode(HeaderMode.raw);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setHeaderMode(HeaderMode.raw);
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.0", "test", moduleInputChannel,
|
||||
consumerProperties);
|
||||
Message<?> message = MessageBuilder.withPayload("foo".getBytes()).build();
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("foo");
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendAndReceiveWithExplicitConsumerGroup() {
|
||||
Kafka09TestBinder binder = getBinder();
|
||||
DirectChannel moduleOutputChannel = new DirectChannel();
|
||||
// Test pub/sub by emulating how StreamPlugin handles taps
|
||||
QueueChannel module1InputChannel = new QueueChannel();
|
||||
QueueChannel module2InputChannel = new QueueChannel();
|
||||
QueueChannel module3InputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.setHeaderMode(HeaderMode.raw);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("baz.0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setHeaderMode(HeaderMode.raw);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
Binding<MessageChannel> input1Binding = binder.bindConsumer("baz.0", "test", module1InputChannel,
|
||||
consumerProperties);
|
||||
// A new module is using the tap as an input channel
|
||||
String fooTapName = "baz.0";
|
||||
Binding<MessageChannel> input2Binding = binder.bindConsumer(fooTapName, "tap1", module2InputChannel,
|
||||
consumerProperties);
|
||||
// Another new module is using tap as an input channel
|
||||
String barTapName = "baz.0";
|
||||
Binding<MessageChannel> input3Binding = binder.bindConsumer(barTapName, "tap2", module3InputChannel,
|
||||
consumerProperties);
|
||||
|
||||
Message<?> message = MessageBuilder.withPayload("foo".getBytes()).build();
|
||||
boolean success = false;
|
||||
boolean retried = false;
|
||||
while (!success) {
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(module1InputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("foo");
|
||||
|
||||
Message<?> tapped1 = receive(module2InputChannel);
|
||||
Message<?> tapped2 = receive(module3InputChannel);
|
||||
if (tapped1 == null || tapped2 == null) {
|
||||
// listener may not have started
|
||||
assertThat(retried).isFalse().withFailMessage("Failed to receive tap after retry");
|
||||
retried = true;
|
||||
continue;
|
||||
}
|
||||
success = true;
|
||||
assertThat(new String((byte[]) tapped1.getPayload())).isEqualTo("foo");
|
||||
assertThat(new String((byte[]) tapped2.getPayload())).isEqualTo("foo");
|
||||
}
|
||||
// delete one tap stream is deleted
|
||||
input3Binding.unbind();
|
||||
Message<?> message2 = MessageBuilder.withPayload("bar".getBytes()).build();
|
||||
moduleOutputChannel.send(message2);
|
||||
|
||||
// other tap still receives messages
|
||||
Message<?> tapped = receive(module2InputChannel);
|
||||
assertThat(tapped).isNotNull();
|
||||
|
||||
// removed tap does not
|
||||
assertThat(receive(module3InputChannel)).isNull();
|
||||
|
||||
// re-subscribed tap does receive the message
|
||||
input3Binding = binder.bindConsumer(barTapName, "tap2", module3InputChannel, createConsumerProperties());
|
||||
assertThat(receive(module3InputChannel)).isNotNull();
|
||||
|
||||
// clean up
|
||||
input1Binding.unbind();
|
||||
input2Binding.unbind();
|
||||
input3Binding.unbind();
|
||||
producerBinding.unbind();
|
||||
assertThat(extractEndpoint(input1Binding).isRunning()).isFalse();
|
||||
assertThat(extractEndpoint(input2Binding).isRunning()).isFalse();
|
||||
assertThat(extractEndpoint(input3Binding).isRunning()).isFalse();
|
||||
assertThat(extractEndpoint(producerBinding).isRunning()).isFalse();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user