Compare commits

..

9 Commits

Author SHA1 Message Date
bamboo
b944b594b5 [artifactory-release] Release version 1.1.0.RC2 2016-09-07 23:38:15 +00:00
Marius Bogoevici
4fa567fec7 Fix documentation issue 2016-09-07 19:14:31 -04:00
bamboo
16d6584774 [artifactory-release] Next development version 2016-09-07 22:38:01 +00:00
bamboo
ea514bd72d [artifactory-release] Release version 1.1.0.RC1 2016-09-07 22:38:01 +00:00
Marius Bogoevici
79feac15d0 Upgrade to Spring Cloud Stream 1.1.0.RC1 2016-09-07 18:25:49 -04:00
Marius Bogoevici
3a27a5ec75 Remove redundant spring-kafka declaration 2016-09-07 18:25:22 -04:00
Marius Bogoevici
7308bd4991 Polishing
Removed optional flag
2016-09-07 17:53:49 -04:00
Soby Chacko
866eaf4a25 Add support for drop-in support for Kafka 0.10
Reflectively detect AdminUtils from Kafka 0.9 and 0.10
Introduce Kafka 10 conditionals
2016-09-07 17:52:53 -04:00
bamboo
b37e7b37d2 [artifactory-release] Next development version 2016-08-26 03:30:26 +00:00
20 changed files with 1341 additions and 523 deletions

53
pom.xml
View File

@@ -2,12 +2,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.1.0.M1</version>
<version>1.1.0.RC2</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-parent</artifactId>
<version>1.1.0.M1</version>
<version>1.1.0.RC1</version>
<relativePath />
</parent>
<properties>
@@ -19,8 +19,8 @@
<modules>
<module>spring-cloud-stream-binder-kafka</module>
<module>spring-cloud-starter-stream-kafka</module>
<module>spring-cloud-stream-binder-kafka-test-support</module>
<module>spring-cloud-stream-binder-kafka-docs</module>
<module>spring-cloud-stream-binder-kafka-0.10-test</module>
</modules>
<build>
<pluginManagement>
@@ -59,6 +59,53 @@
</plugins>
</pluginManagement>
</build>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>${spring-integration-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<classifier>test</classifier>
<version>${kafka.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<profiles>
<profile>
<id>spring</id>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.1.0.M1</version>
<version>1.1.0.RC2</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>
@@ -20,7 +20,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>1.1.0.M1</version>
<version>1.1.0.RC2</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.1.0.RC2</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10 Tests</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>http://www.spring.io</url>
</organization>
<properties>
<main.basedir>${basedir}/../..</main.basedir>
<!--
Override Kafka dependencies to Kafka 0.10 and supporting Spring Kafka and
Spring Integration Kafka versions
-->
<kafka.version>0.10.0.0</kafka.version>
<spring-kafka.version>1.1.0.M1</spring-kafka.version>
<spring-integration-kafka.version>2.0.1.RELEASE</spring-integration-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>1.1.0.RC2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>1.1.0.RC2</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,179 @@
/*
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.Before;
import org.junit.ClassRule;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.retry.RetryOperations;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
* @author Eric Bottard
* @author Marius Bogoevici
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
*/
public class Kafka10BinderTests extends KafkaBinderTests {
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
private Kafka10TestBinder binder;
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
Thread.sleep(500);
}
@Override
protected Kafka10TestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binder = new Kafka10TestBinder(binderConfiguration);
}
return binder;
}
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
List<String> bAddresses = new ArrayList<>();
for (BrokerAddress bAddress : brokerAddresses) {
bAddresses.add(bAddress.toString());
}
String[] foo = new String[bAddresses.size()];
binderConfiguration.setBrokers(bAddresses.toArray(foo));
binderConfiguration.setZkNodes(embeddedKafka.getZookeeperConnectionString());
return binderConfiguration;
}
@Override
protected int partitionSize(String topic) {
return consumerFactory().createConsumer().partitionsFor(topic).size();
}
@Override
@SuppressWarnings("unchecked")
protected void setMetadataRetryOperations(Binder binder, RetryOperations retryOperations) {
((Kafka10TestBinder) binder).getBinder().setMetadataRetryOperations(retryOperations);
}
@Override
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
kafkaBinderConfigurationProperties.getZkSessionTimeout(), kafkaBinderConfigurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
return new ZkUtils(zkClient, null, false);
}
@Override
protected void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Properties topicConfig) {
adminUtilsOperation.invokeCreateTopic(zkUtils, topic, partitions, replicationFactor, new Properties());
}
@Override
protected int invokePartitionSize(String topic, ZkUtils zkUtils) {
return adminUtilsOperation.partitionSize(topic, zkUtils);
}
@Override
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
return new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
}
@Override
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
return new ExtendedProducerProperties<>(new KafkaProducerProperties());
}
@Override
public String getKafkaOffsetHeaderKey() {
return KafkaHeaders.OFFSET;
}
@Override
protected Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
return new Kafka10TestBinder(kafkaBinderConfigurationProperties);
}
@Before
public void init() {
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
if (multiplier != null) {
timeoutMultiplier = Double.parseDouble(multiplier);
}
}
@Override
protected boolean usesExplicitRouting() {
return false;
}
@Override
protected String getClassUnderTestName() {
return CLASS_UNDER_TEST_NAME;
}
@Override
public Spy spyOn(final String name) {
throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
}
private ConsumerFactory<byte[], byte[]> consumerFactory() {
Map<String, Object> props = new HashMap<>();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST-CONSUMER-GROUP");
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
}
}

View File

@@ -16,20 +16,9 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.List;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Registration;
import org.springframework.cloud.stream.binder.AbstractTestBinder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.codec.Codec;
import org.springframework.integration.codec.kryo.KryoRegistrar;
import org.springframework.integration.codec.kryo.PojoCodec;
import org.springframework.integration.tuple.TupleKryoRegistrar;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
@@ -41,10 +30,9 @@ import org.springframework.kafka.support.ProducerListener;
* @author Gary Russell
* @author Soby Chacko
*/
public class KafkaTestBinder extends
AbstractTestBinder<KafkaMessageChannelBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
public class Kafka10TestBinder extends AbstractKafkaTestBinder {
public KafkaTestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
public Kafka10TestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
try {
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration);
binder.setCodec(getCodec());
@@ -53,6 +41,7 @@ public class KafkaTestBinder extends
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.setAdminUtilsOperation(new Kafka10AdminUtilsOperation());
binder.afterPropertiesSet();
this.setBinder(binder);
}
@@ -61,27 +50,4 @@ public class KafkaTestBinder extends
}
}
@Override
public void cleanup() {
// do nothing - the rule will take care of that
}
private static Codec getCodec() {
return new PojoCodec(new TupleRegistrar());
}
private static class TupleRegistrar implements KryoRegistrar {
private final TupleKryoRegistrar delegate = new TupleKryoRegistrar();
@Override
public void registerTypes(Kryo kryo) {
this.delegate.registerTypes(kryo);
}
@Override
public List<Registration> getRegistrations() {
return this.delegate.getRegistrations();
}
}
}

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.1.0.M1</version>
<version>1.1.0.RC2</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
@@ -18,7 +18,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>1.1.0.M1</version>
<version>1.1.0.RC2</version>
</dependency>
</dependencies>
<profiles>

View File

@@ -120,6 +120,13 @@ Default: `2097152`.
The following properties are available for Kafka consumers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
autoRebalanceEnabled::
When `true`, topic partitions will be automatically rebalanced between the members of a consumer group.
When `false`, each consumer will be assigned a fixed set of partitions based on `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex`.
This requires both `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex` properties to be set appropriately on each launched instance.
The property `spring.cloud.stream.instanceCount` must typically be greater than 1 in this case.
+
Default: `true`.
autoCommitOffset::
Whether to autocommit offsets when a message has been processed.
If set to `false`, an `Acknowledgment` header will be available in the message headers for late acknowledgment.
@@ -226,4 +233,41 @@ Here is an example of launching a Spring Cloud Stream application with SASL and
Exercise caution when using the `autoCreateTopics` and `autoAddPartitions` if using Kerberos.
Usually applications may use principals that do not have administrative rights in Kafka and Zookeeper, and relying on Spring Cloud Stream to create/modify topics may fail.
In secure environments, we strongly recommend creating topics and managing ACLs administratively using Kafka tooling.
====
====
==== Using the binder with Apache Kafka 0.10
The binder also supports connecting to Kafka 0.10 brokers.
In order to support this, when you create the project that contains your application, include `spring-cloud-starter-stream-kafka` as you normally would do for 0.9 based applications.
Then add these dependencies at the top of the `<dependencies>` section in the pom.xml file to override the Apache Kafka, Spring Kafka, and Spring Integration Kafka with 0.10-compatible versions as in the following example:
[source,xml]
----
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.0.M1</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
----
[NOTE]
====
The versions above are provided only for the sake of the example.
For best results, we recommend using the most recent 0.10-compatible versions of the projects.
====

View File

@@ -1,24 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.1.0.M1</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<description>Kafka related test classes</description>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.1.0.M1</version>
<version>1.1.0.RC2</version>
</parent>
<dependencies>
@@ -37,12 +37,6 @@
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<scope>test</scope>
<version>1.1.0.M1</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
@@ -54,17 +48,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
@@ -73,6 +56,21 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>${spring-integration-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
@@ -81,34 +79,20 @@
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<classifier>test</classifier>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -33,6 +33,7 @@ import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigura
/**
* Health indicator for Kafka.
*
* @author Ilayaperumal Gopinathan
* @author Marius Bogoevici
*/
@@ -43,7 +44,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
private final KafkaBinderConfigurationProperties configurationProperties;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties configurationProperties) {
KafkaBinderConfigurationProperties configurationProperties) {
this.binder = binder;
this.configurationProperties = configurationProperties;

View File

@@ -25,8 +25,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import kafka.admin.AdminUtils;
import kafka.api.TopicMetadata;
import kafka.common.ErrorMapping;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -42,7 +40,6 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;
import scala.collection.Seq;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
@@ -52,6 +49,7 @@ import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.context.Lifecycle;
import org.springframework.expression.common.LiteralExpression;
@@ -83,6 +81,7 @@ import org.springframework.util.StringUtils;
/**
* A {@link Binder} that uses Kafka as the underlying middleware.
*
* @author Eric Bottard
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
@@ -109,6 +108,8 @@ public class KafkaMessageChannelBinder extends
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
private AdminUtilsOperation adminUtilsOperation;
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties) {
super(false, headersToMap(configurationProperties));
this.configurationProperties = configurationProperties;
@@ -130,8 +131,13 @@ public class KafkaMessageChannelBinder extends
return headersToMap;
}
public void setAdminUtilsOperation(AdminUtilsOperation adminUtilsOperation) {
this.adminUtilsOperation = adminUtilsOperation;
}
/**
* Retry configuration for operations such as validating topic creation
*
* @param metadataRetryOperations the retry configuration
*/
public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
@@ -189,7 +195,7 @@ public class KafkaMessageChannelBinder extends
@Override
protected MessageHandler createProducerMessageHandler(final String destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
KafkaTopicUtils.validateTopicName(destination);
@@ -215,11 +221,12 @@ public class KafkaMessageChannelBinder extends
@Override
protected String createProducerDestinationIfNecessary(String name,
ExtendedProducerProperties<KafkaProducerProperties> properties) {
ExtendedProducerProperties<KafkaProducerProperties> properties) {
if (this.logger.isInfoEnabled()) {
this.logger.info("Using kafka topic for outbound: " + name);
}
KafkaTopicUtils.validateTopicName(name);
Collection<PartitionInfo> partitions = ensureTopicCreated(name, properties.getPartitionCount());
if (properties.getPartitionCount() < partitions.size()) {
if (this.logger.isInfoEnabled()) {
@@ -259,17 +266,19 @@ public class KafkaMessageChannelBinder extends
@Override
protected Collection<PartitionInfo> createConsumerDestinationIfNecessary(String name, String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
KafkaTopicUtils.validateTopicName(name);
if (properties.getInstanceCount() == 0) {
throw new IllegalArgumentException("Instance count cannot be zero");
}
Collection<PartitionInfo> allPartitions = ensureTopicCreated(name,
properties.getInstanceCount() * properties.getConcurrency());
Collection<PartitionInfo> listenedPartitions;
if (properties.getInstanceCount() == 1) {
if (properties.getExtension().isAutoRebalanceEnabled() ||
properties.getInstanceCount() == 1) {
listenedPartitions = allPartitions;
}
else {
@@ -288,7 +297,7 @@ public class KafkaMessageChannelBinder extends
@Override
@SuppressWarnings("unchecked")
protected MessageProducer createConsumerEndpoint(String name, String group, Collection<PartitionInfo> destination,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
@@ -429,38 +438,38 @@ public class KafkaMessageChannelBinder extends
this.configurationProperties.getZkConnectionTimeout(),
JaasUtils.isZkSecurityEnabled());
try {
final Properties topicConfig = new Properties();
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
if (topicMetadata.errorCode() == ErrorMapping.NoError()) {
// only consider minPartitionCount for resizing if autoAddPartitions is
// true
short errorCode = adminUtilsOperation.errorCodeFromTopicMetadata(topicName, zkUtils);
if (errorCode == ErrorMapping.NoError()) {
// only consider minPartitionCount for resizing if autoAddPartitions is true
int effectivePartitionCount = this.configurationProperties.isAutoAddPartitions()
? Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount)
: partitionCount;
if (topicMetadata.partitionsMetadata().size() < effectivePartitionCount) {
int partitionSize = adminUtilsOperation.partitionSize(topicName, zkUtils);
if (partitionSize < effectivePartitionCount) {
if (this.configurationProperties.isAutoAddPartitions()) {
AdminUtils.addPartitions(zkUtils, topicName, effectivePartitionCount, null, false);
adminUtilsOperation.invokeAddPartitions(zkUtils, topicName, effectivePartitionCount, null, false);
}
else {
int topicSize = topicMetadata.partitionsMetadata().size();
throw new BinderException("The number of expected partitions was: " + partitionCount + ", but "
+ topicSize + (topicSize > 1 ? " have " : " has ") + "been found instead."
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
+ "Consider either increasing the partition count of the topic or enabling " +
"`autoAddPartitions`");
}
}
}
else if (topicMetadata.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) {
else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
if (this.configurationProperties.isAutoCreateTopics()) {
Seq<Object> brokerList = zkUtils.getSortedBrokerList();
// always consider minPartitionCount for topic creation
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
partitionCount);
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
@Override
public Object doWithRetry(RetryContext context) throws RuntimeException {
AdminUtils.createTopic(zkUtils, topicName, effectivePartitionCount,
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
configurationProperties.getReplicationFactor(), new Properties());
return null;
}
@@ -472,7 +481,7 @@ public class KafkaMessageChannelBinder extends
}
else {
throw new BinderException("Error fetching Kafka topic metadata: ",
ErrorMapping.exceptionFor(topicMetadata.errorCode()));
ErrorMapping.exceptionFor(errorCode));
}
try {
return this.metadataRetryOperations
@@ -549,8 +558,8 @@ public class KafkaMessageChannelBinder extends
private final DefaultKafkaProducerFactory<byte[], byte[]> producerFactory;
private ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
super(kafkaTemplate);
setTopicExpression(new LiteralExpression(topic));
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.admin;
import java.util.Properties;
import kafka.utils.ZkUtils;
/**
* API around {@link kafka.admin.AdminUtils} to support
* various versions of Kafka brokers.
*
* Note: Implementations that support Kafka brokers other than 0.9, need to use
* a possible strategy that involves reflection around {@link kafka.admin.AdminUtils}.
*
* @author Soby Chacko
*/
public interface AdminUtilsOperation {
/**
* Invoke {@link kafka.admin.AdminUtils#addPartitions}
*
* @param zkUtils Zookeeper utils
* @param topic name of the topic
* @param numPartitions
* @param replicaAssignmentStr
* @param checkBrokerAvailable
*/
void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions,
String replicaAssignmentStr, boolean checkBrokerAvailable);
/**
* Invoke {@link kafka.admin.AdminUtils#fetchTopicMetadataFromZk}
*
* @param topic name
* @param zkUtils zookeeper utils
* @return error code
*/
short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils);
/**
* Find partition size from Kafka broker using {@link kafka.admin.AdminUtils}
*
* @param topic name
* @param zkUtils zookeeper utils
* @return partition size
*/
int partitionSize(String topic, ZkUtils zkUtils);
/**
* Inovke {@link kafka.admin.AdminUtils#createTopic}
*
* @param zkUtils zookeeper utils
* @param topic name
* @param partitions
* @param replicationFactor
* @param topicConfig
*/
void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
int replicationFactor, Properties topicConfig);
}

View File

@@ -0,0 +1,51 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.admin;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.api.TopicMetadata;
import kafka.utils.ZkUtils;
/**
* @author Soby Chacko
*/
public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
public void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions,
String replicaAssignmentStr, boolean checkBrokerAvailable) {
AdminUtils.addPartitions(zkUtils, topic, numPartitions,
replicaAssignmentStr, checkBrokerAvailable);
}
public short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils) {
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
return topicMetadata.errorCode();
}
public int partitionSize(String topic, ZkUtils zkUtils) {
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
return topicMetadata.partitionsMetadata().size();
}
public void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
int replicationFactor, Properties topicConfig) {
AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor,
topicConfig);
}
}

View File

@@ -0,0 +1,151 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.admin;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Properties;
import kafka.api.PartitionMetadata;
import kafka.utils.ZkUtils;
import org.springframework.util.ReflectionUtils;
/**
* @author Soby Chacko
*/
public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
private static ClassLoader CLASS_LOADER = Kafka10AdminUtilsOperation.class.getClassLoader();
private static Class<?> ADMIN_UTIL_CLASS;
static {
try {
ADMIN_UTIL_CLASS = CLASS_LOADER.loadClass("kafka.admin.AdminUtils");
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("AdminUtils class not found", e);
}
}
public void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions,
String replicaAssignmentStr, boolean checkBrokerAvailable) {
try {
Method[] declaredMethods = ADMIN_UTIL_CLASS.getDeclaredMethods();
Method addPartitions = null;
for (Method m : declaredMethods) {
if (m.getName().equals("addPartitions")) {
addPartitions = m;
}
}
if (addPartitions != null) {
addPartitions.invoke(null, zkUtils, topic, numPartitions,
replicaAssignmentStr, checkBrokerAvailable, null);
}
else {
throw new InvocationTargetException(
new RuntimeException("method not found"));
}
}
catch (InvocationTargetException e) {
ReflectionUtils.handleInvocationTargetException(e);
}
catch (IllegalAccessException e) {
ReflectionUtils.handleReflectionException(e);
}
}
public short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils) {
try {
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
Class<?> topicMetadataClass = CLASS_LOADER.loadClass("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata");
Method errorCodeMethod = ReflectionUtils.findMethod(topicMetadataClass, "error");
Object obj = errorCodeMethod.invoke(result);
Method code = ReflectionUtils.findMethod(obj.getClass(), "code");
return (short) code.invoke(obj);
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("AdminUtils class not found", e);
}
catch (InvocationTargetException e) {
ReflectionUtils.handleInvocationTargetException(e);
}
catch (IllegalAccessException e) {
ReflectionUtils.handleReflectionException(e);
}
return 0;
}
@SuppressWarnings("unchecked")
public int partitionSize(String topic, ZkUtils zkUtils) {
try {
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
Class<?> topicMetadataClass = CLASS_LOADER.loadClass("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata");
Method partitionsMetadata = ReflectionUtils.findMethod(topicMetadataClass, "partitionMetadata");
List<PartitionMetadata> foo = (List<PartitionMetadata>) partitionsMetadata.invoke(result);
return foo.size();
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("AdminUtils class not found", e);
}
catch (InvocationTargetException e) {
ReflectionUtils.handleInvocationTargetException(e);
}
catch (IllegalAccessException e) {
ReflectionUtils.handleReflectionException(e);
}
return 0;
}
public void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
int replicationFactor, Properties topicConfig) {
try {
Method[] declaredMethods = ADMIN_UTIL_CLASS.getDeclaredMethods();
Method createTopic = null;
for (Method m : declaredMethods) {
if (m.getName().equals("createTopic") && m.getParameterTypes()[m.getParameterTypes().length - 1].getName().endsWith("RackAwareMode")) {
createTopic = m;
break;
}
}
if (createTopic != null) {
createTopic.invoke(null, zkUtils, topic, partitions,
replicationFactor, topicConfig, null);
}
else {
throw new InvocationTargetException(
new RuntimeException("method not found"));
}
}
catch (InvocationTargetException e) {
ReflectionUtils.handleInvocationTargetException(e);
}
catch (IllegalAccessException e) {
ReflectionUtils.handleReflectionException(e);
}
}
}

View File

@@ -16,6 +16,11 @@
package org.springframework.cloud.stream.binder.kafka.config;
import java.lang.reflect.Method;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -24,10 +29,18 @@ import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.config.codec.kryo.KryoCodecAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.integration.codec.Codec;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
@@ -45,6 +58,8 @@ import org.springframework.kafka.support.ProducerListener;
@EnableConfigurationProperties({KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class})
public class KafkaBinderConfiguration {
protected final Log logger = LogFactory.getLog(getClass());
@Autowired
private Codec codec;
@@ -57,6 +72,9 @@ public class KafkaBinderConfiguration {
@Autowired
private ProducerListener producerListener;
@Autowired
private ApplicationContext context;
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder() {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
@@ -64,6 +82,8 @@ public class KafkaBinderConfiguration {
kafkaMessageChannelBinder.setCodec(this.codec);
//kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
AdminUtilsOperation adminUtilsOperation = context.getBean(AdminUtilsOperation.class);
kafkaMessageChannelBinder.setAdminUtilsOperation(adminUtilsOperation);
return kafkaMessageChannelBinder;
}
@@ -77,4 +97,69 @@ public class KafkaBinderConfiguration {
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, this.configurationProperties);
}
@Bean(name = "adminUtilsOperation")
@Conditional(Kafka09Condition.class)
public AdminUtilsOperation kafka09AdminUtilsOperation() {
logger.info("AdminUtils selected: Kafka 0.9 AdminUtils");
return new Kafka09AdminUtilsOperation();
}
@Bean(name = "adminUtilsOperation")
@Conditional(Kafka10Condition.class)
public AdminUtilsOperation kafka10AdminUtilsOperation() {
logger.info("AdminUtils selected: Kafka 0.10 AdminUtils");
return new Kafka10AdminUtilsOperation();
}
private static Method getMethod(ClassLoader classLoader, String methodName) {
try {
Class<?> adminUtilClass = classLoader.loadClass("kafka.admin.AdminUtils");
Method[] declaredMethods = adminUtilClass.getDeclaredMethods();
for (Method m : declaredMethods) {
if (m.getName().equals(methodName)) {
return m;
}
}
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("AdminUtils not found", e);
}
return null;
}
static class Kafka10Condition implements Condition {
@Override
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
ClassLoader classLoader = Kafka10Condition.class.getClassLoader();
Method addPartitions = getMethod(classLoader, "addPartitions");
if (addPartitions != null) {
Class<?>[] parameterTypes = addPartitions.getParameterTypes();
Class clazz = parameterTypes[parameterTypes.length - 1];
if (clazz.getName().equals("kafka.admin.RackAwareMode")) {
return true;
}
}
return false;
}
}
static class Kafka09Condition implements Condition {
@Override
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
ClassLoader classLoader = Kafka09Condition.class.getClassLoader();
Method addPartitions = getMethod(classLoader, "addPartitions");
if (addPartitions != null) {
Class<?>[] parameterTypes = addPartitions.getParameterTypes();
Class clazz = parameterTypes[parameterTypes.length - 1];
if (!clazz.getName().equals("kafka.admin.RackAwareMode")) {
return true;
}
}
return false;
}
}
}

View File

@@ -0,0 +1,46 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.List;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Registration;
import org.springframework.cloud.stream.binder.AbstractTestBinder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.integration.codec.Codec;
import org.springframework.integration.codec.kryo.KryoRegistrar;
import org.springframework.integration.codec.kryo.PojoCodec;
import org.springframework.integration.tuple.TupleKryoRegistrar;
/**
* @author Soby Chacko
*/
public abstract class AbstractKafkaTestBinder extends
AbstractTestBinder<KafkaMessageChannelBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
@Override
public void cleanup() {
// do nothing - the rule will take care of that
}
protected static Codec getCodec() {
return new PojoCodec(new TupleRegistrar());
}
private static class TupleRegistrar implements KryoRegistrar {
private final TupleKryoRegistrar delegate = new TupleKryoRegistrar();
@Override
public void registerTypes(Kryo kryo) {
this.delegate.registerTypes(kryo);
}
@Override
public List<Registration> getRegistrations() {
return this.delegate.getRegistrations();
}
}
}

View File

@@ -0,0 +1,176 @@
/*
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.Before;
import org.junit.ClassRule;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.retry.RetryOperations;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
* @author Eric Bottard
* @author Marius Bogoevici
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
*/
public class Kafka09BinderTests extends KafkaBinderTests {
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
private Kafka09TestBinder binder;
private Kafka09AdminUtilsOperation adminUtilsOperation = new Kafka09AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
Thread.sleep(500);
}
@Override
protected Kafka09TestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binder = new Kafka09TestBinder(binderConfiguration);
}
return binder;
}
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
List<String> bAddresses = new ArrayList<>();
for (BrokerAddress bAddress : brokerAddresses) {
bAddresses.add(bAddress.toString());
}
String[] foo = new String[bAddresses.size()];
binderConfiguration.setBrokers(bAddresses.toArray(foo));
binderConfiguration.setZkNodes(embeddedKafka.getZookeeperConnectionString());
return binderConfiguration;
}
@Override
protected int partitionSize(String topic) {
return consumerFactory().createConsumer().partitionsFor(topic).size();
}
@Override
protected void setMetadataRetryOperations(Binder binder, RetryOperations retryOperations) {
((Kafka09TestBinder) binder).getBinder().setMetadataRetryOperations(retryOperations);
}
@Override
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
kafkaBinderConfigurationProperties.getZkSessionTimeout(), kafkaBinderConfigurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
return new ZkUtils(zkClient, null, false);
}
@Override
protected void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Properties topicConfig) {
adminUtilsOperation.invokeCreateTopic(zkUtils, topic, partitions, replicationFactor, new Properties());
}
@Override
protected int invokePartitionSize(String topic, ZkUtils zkUtils) {
return adminUtilsOperation.partitionSize(topic, zkUtils);
}
@Override
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
return new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
}
@Override
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
return new ExtendedProducerProperties<>(new KafkaProducerProperties());
}
@Override
public String getKafkaOffsetHeaderKey() {
return KafkaHeaders.OFFSET;
}
@Override
protected Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
return new Kafka09TestBinder(kafkaBinderConfigurationProperties);
}
@Before
public void init() {
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
if (multiplier != null) {
timeoutMultiplier = Double.parseDouble(multiplier);
}
}
@Override
protected boolean usesExplicitRouting() {
return false;
}
@Override
protected String getClassUnderTestName() {
return CLASS_UNDER_TEST_NAME;
}
@Override
public Spy spyOn(final String name) {
throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
}
private ConsumerFactory<byte[], byte[]> consumerFactory() {
Map<String, Object> props = new HashMap<>();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
}
}

View File

@@ -0,0 +1,54 @@
/*
* Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
/**
* Test support class for {@link KafkaMessageChannelBinder}. Creates a binder that uses
* an embedded Kafka cluster.
* @author Eric Bottard
* @author Marius Bogoevici
* @author David Turanski
* @author Gary Russell
* @author Soby Chacko
*/
public class Kafka09TestBinder extends AbstractKafkaTestBinder {
public Kafka09TestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
try {
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration);
binder.setCodec(getCodec());
ProducerListener producerListener = new LoggingProducerListener();
binder.setProducerListener(producerListener);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.setAdminUtilsOperation(new Kafka09AdminUtilsOperation());
binder.afterPropertiesSet();
this.setBinder(binder);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 the original author or authors.
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,30 +16,18 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminUtils;
import kafka.api.TopicMetadata;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.assertj.core.api.Condition;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
@@ -51,7 +39,6 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
import org.springframework.cloud.stream.binder.PartitionTestSupport;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.TestUtils;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.config.BindingProperties;
@@ -60,18 +47,14 @@ import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@@ -80,101 +63,37 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
* @author Eric Bottard
* @author Marius Bogoevici
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
* @author Soby Chacko
*/
public class KafkaBinderTests
extends
PartitionCapableBinderTests<KafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>> {
public abstract class KafkaBinderTests extends PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>> {
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
protected abstract ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties();
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
protected abstract ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties();
private KafkaTestBinder binder;
public abstract String getKafkaOffsetHeaderKey();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
Thread.sleep(500);
}
protected abstract Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties);
@Override
protected KafkaTestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binder = new KafkaTestBinder(binderConfiguration);
}
return binder;
}
protected abstract KafkaBinderConfigurationProperties createConfigurationProperties();
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
List<String> bAddresses = new ArrayList<>();
for (BrokerAddress bAddress : brokerAddresses) {
bAddresses.add(bAddress.toString());
}
String[] foo = new String[bAddresses.size()];
binderConfiguration.setBrokers(bAddresses.toArray(foo));
binderConfiguration.setZkNodes(embeddedKafka.getZookeeperConnectionString());
return binderConfiguration;
}
protected abstract int partitionSize(String topic);
@Override
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
return new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
}
protected abstract void setMetadataRetryOperations(Binder binder, RetryOperations retryOperations);
@Override
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
return new ExtendedProducerProperties<>(new KafkaProducerProperties());
}
protected abstract ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties);
@Before
public void init() {
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
if (multiplier != null) {
timeoutMultiplier = Double.parseDouble(multiplier);
}
}
protected abstract void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
int replicationFactor, Properties topicConfig);
@Override
protected boolean usesExplicitRouting() {
return false;
}
@Override
protected String getClassUnderTestName() {
return CLASS_UNDER_TEST_NAME;
}
@Override
public Spy spyOn(final String name) {
throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
}
private ConsumerFactory<byte[], byte[]> consumerFactory() {
Map<String, Object> props = new HashMap<>();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST-CONSUMER-GROUP");
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
}
protected abstract int invokePartitionSize(String topic,
ZkUtils zkUtils);
@Test
@SuppressWarnings("unchecked")
public void testDlqAndRetry() throws Exception {
KafkaTestBinder binder = getBinder();
Binder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
DirectChannel moduleInputChannel = new DirectChannel();
QueueChannel dlqChannel = new QueueChannel();
@@ -216,8 +135,9 @@ public class KafkaBinderTests
}
@Test
@SuppressWarnings("unchecked")
public void testDefaultAutoCommitOnErrorWithoutDlq() throws Exception {
KafkaTestBinder binder = getBinder();
Binder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
DirectChannel moduleInputChannel = new DirectChannel();
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
@@ -266,8 +186,9 @@ public class KafkaBinderTests
}
@Test
@SuppressWarnings("unchecked")
public void testDefaultAutoCommitOnErrorWithDlq() throws Exception {
KafkaTestBinder binder = getBinder();
Binder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
DirectChannel moduleInputChannel = new DirectChannel();
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
@@ -325,19 +246,39 @@ public class KafkaBinderTests
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testAutoCreateTopicsEnabledSucceeds() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
configurationProperties.setAutoCreateTopics(true);
Binder binder = getBinder(configurationProperties);
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
String testTopicName = "nonexisting" + System.currentTimeMillis();
Binding<?> binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
binding.unbind();
}
@Test(expected = IllegalArgumentException.class)
public void testValidateKafkaTopicName() {
KafkaTopicUtils.validateTopicName("foo:bar");
}
@Test
@SuppressWarnings("unchecked")
public void testCompression() throws Exception {
final KafkaProducerProperties.CompressionType[] codecs = new KafkaProducerProperties.CompressionType[] {
final KafkaProducerProperties.CompressionType[] codecs = new KafkaProducerProperties.CompressionType[]{
KafkaProducerProperties.CompressionType.none, KafkaProducerProperties.CompressionType.gzip,
KafkaProducerProperties.CompressionType.snappy};
byte[] testPayload = new byte[2048];
Arrays.fill(testPayload, (byte) 65);
KafkaTestBinder binder = getBinder();
Binder binder = getBinder();
for (KafkaProducerProperties.CompressionType codec : codecs) {
DirectChannel moduleOutputChannel = new DirectChannel();
QueueChannel moduleInputChannel = new QueueChannel();
@@ -364,13 +305,14 @@ public class KafkaBinderTests
}
@Test
@SuppressWarnings("unchecked")
public void testCustomPartitionCountOverridesDefaultIfLarger() throws Exception {
byte[] testPayload = new byte[2048];
Arrays.fill(testPayload, (byte) 65);
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setMinPartitionCount(10);
KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration);
Binder binder = getBinder(binderConfiguration);
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(10);
@@ -393,23 +335,20 @@ public class KafkaBinderTests
assertThat(inbound).isNotNull();
assertThat((byte[]) inbound.getPayload()).containsExactly(testPayload);
Collection<PartitionInfo> partitions =
consumerFactory().createConsumer().partitionsFor("foo" + uniqueBindingId + ".0");
assertThat(partitions).hasSize(10);
assertThat(partitionSize("foo" + uniqueBindingId + ".0")).isEqualTo(10);
producerBinding.unbind();
consumerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testCustomPartitionCountDoesNotOverridePartitioningIfSmaller() throws Exception {
byte[] testPayload = new byte[2048];
Arrays.fill(testPayload, (byte) 65);
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setMinPartitionCount(6);
KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration);
Binder binder = getBinder(binderConfiguration);
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(5);
@@ -431,22 +370,21 @@ public class KafkaBinderTests
Message<?> inbound = receive(moduleInputChannel);
assertThat(inbound).isNotNull();
assertThat((byte[]) inbound.getPayload()).containsExactly(testPayload);
Collection<PartitionInfo> partitions =
consumerFactory().createConsumer().partitionsFor("foo" + uniqueBindingId + ".0");
assertThat(partitions).hasSize(6);
assertThat(partitionSize("foo" + uniqueBindingId + ".0")).isEqualTo(6);
producerBinding.unbind();
consumerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testCustomPartitionCountOverridesPartitioningIfLarger() throws Exception {
byte[] testPayload = new byte[2048];
Arrays.fill(testPayload, (byte) 65);
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setMinPartitionCount(4);
KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration);
Binder binder = getBinder(binderConfiguration);
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
@@ -468,9 +406,7 @@ public class KafkaBinderTests
Message<?> inbound = receive(moduleInputChannel);
assertThat(inbound).isNotNull();
assertThat((byte[]) inbound.getPayload()).containsExactly(testPayload);
Collection<PartitionInfo> partitions =
consumerFactory().createConsumer().partitionsFor("foo" + uniqueBindingId + ".0");
assertThat(partitions).hasSize(5);
assertThat(partitionSize("foo" + uniqueBindingId + ".0")).isEqualTo(5);
producerBinding.unbind();
consumerBinding.unbind();
}
@@ -478,11 +414,11 @@ public class KafkaBinderTests
@Test
@SuppressWarnings("unchecked")
public void testDefaultConsumerStartsAtEarliest() throws Exception {
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(createConfigurationProperties());
Binder binder = getBinder(createConfigurationProperties());
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
//binder.setApplicationContext(context);
//binder.afterPropertiesSet();
DirectChannel output = new DirectChannel();
QueueChannel input1 = new QueueChannel();
@@ -515,7 +451,7 @@ public class KafkaBinderTests
Binding<MessageChannel> consumerBinding = null;
try {
KafkaTestBinder binder = getBinder();
Binder binder = getBinder();
DirectChannel output = new DirectChannel();
QueueChannel input1 = new QueueChannel();
@@ -552,7 +488,7 @@ public class KafkaBinderTests
@Ignore("Needs further discussion")
@SuppressWarnings("unchecked")
public void testReset() throws Exception {
KafkaTestBinder binder = getBinder();
Binder binder = getBinder();
DirectChannel output = new DirectChannel();
QueueChannel input1 = new QueueChannel();
@@ -605,11 +541,7 @@ public class KafkaBinderTests
try {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
Binder binder = getBinder(configurationProperties);
DirectChannel output = new DirectChannel();
QueueChannel input1 = new QueueChannel();
@@ -653,251 +585,6 @@ public class KafkaBinderTests
}
}
@Test
public void testSyncProducerMetadata() throws Exception {
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(createConfigurationProperties());
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
DirectChannel output = new DirectChannel();
String testTopicName = UUID.randomUUID().toString();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.getExtension().setSync(true);
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, output, properties);
DirectFieldAccessor accessor = new DirectFieldAccessor(extractEndpoint(producerBinding));
KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor.getWrappedInstance();
assertThat(new DirectFieldAccessor(wrappedInstance).getPropertyValue("sync").equals(Boolean.TRUE))
.withFailMessage("Kafka Sync Producer should have been enabled.");
producerBinding.unbind();
}
@Test
public void testAutoCreateTopicsDisabledFailsIfTopicMissing() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
configurationProperties.setAutoCreateTopics(false);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
binder.setMetadataRetryOperations(metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
String testTopicName = "nonexisting" + System.currentTimeMillis();
try {
binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
fail();
}
catch (Exception e) {
assertThat(e).isInstanceOf(BinderException.class);
assertThat(e).hasMessageContaining("Topic " + testTopicName + " does not exist");
}
}
@Test
public void testAutoConfigureTopicsDisabledSucceedsIfTopicExisting() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
String testTopicName = "existing" + System.currentTimeMillis();
AdminUtils.createTopic(zkUtils, testTopicName, 5, 1, new Properties());
configurationProperties.setAutoCreateTopics(false);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<MessageChannel> binding = binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
binding.unbind();
}
@Test
public void testAutoAddPartitionsDisabledFailsIfTopicUnderpartitioned() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
String testTopicName = "existing" + System.currentTimeMillis();
AdminUtils.createTopic(zkUtils, testTopicName, 1, 1, new Properties());
configurationProperties.setAutoAddPartitions(false);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
// this consumer must consume from partition 2
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(2);
try {
binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
}
catch (Exception e) {
assertThat(e).isInstanceOf(BinderException.class);
assertThat(e)
.hasMessageContaining("The number of expected partitions was: 3, but 1 has been found instead");
}
}
@Test
public void testAutoAddPartitionsDisabledSucceedsIfTopicPartitionedCorrectly() throws Exception {
Binding<?> binding = null;
try {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
String testTopicName = "existing" + System.currentTimeMillis();
AdminUtils.createTopic(zkUtils, testTopicName, 6, 1, new Properties());
configurationProperties.setAutoAddPartitions(false);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
binder.setMetadataRetryOperations(metatadataRetrievalRetryOperations);
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
// this consumer must consume from partition 2
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(2);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
binding = binder.doBindConsumer(testTopicName, "test-x", output, consumerProperties);
TopicPartitionInitialOffset[] listenedPartitions = TestUtils.getPropertyValue(binding,
"endpoint.messageListenerContainer.containerProperties.topicPartitions",
TopicPartitionInitialOffset[].class);
assertThat(listenedPartitions).hasSize(2);
assertThat(listenedPartitions).contains(new TopicPartitionInitialOffset(testTopicName, 2),
new TopicPartitionInitialOffset(testTopicName, 5));
Collection<PartitionInfo> partitions =
consumerFactory().createConsumer().partitionsFor(testTopicName);
assertThat(partitions).hasSize(6);
}
finally {
binding.unbind();
}
}
@Test
public void testAutoCreateTopicsEnabledSucceeds() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
configurationProperties.setAutoCreateTopics(true);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
binder.setMetadataRetryOperations(metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
String testTopicName = "nonexisting" + System.currentTimeMillis();
Binding<?> binding = binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
binding.unbind();
}
@Test
public void testPartitionCountNotReduced() throws Exception {
String testTopicName = "existing" + System.currentTimeMillis();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
AdminUtils.createTopic(zkUtils, testTopicName, 6, 1, new Properties());
configurationProperties.setAutoAddPartitions(true);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
binder.setMetadataRetryOperations(metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<?> binding = binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
binding.unbind();
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(testTopicName,
zkUtils);
assertThat(topicMetadata.partitionsMetadata().size()).isEqualTo(6);
}
@Test
public void testPartitionCountIncreasedIfAutoAddPartitionsSet() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
String testTopicName = "existing" + System.currentTimeMillis();
AdminUtils.createTopic(zkUtils, testTopicName, 1, 1, new Properties());
configurationProperties.setMinPartitionCount(6);
configurationProperties.setAutoAddPartitions(true);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
binder.setMetadataRetryOperations(metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<?> binding = binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
binding.unbind();
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(testTopicName,
zkUtils);
assertThat(topicMetadata.partitionsMetadata().size()).isEqualTo(6);
}
@Test
@Override
@SuppressWarnings("unchecked")
@@ -1156,7 +843,6 @@ public class KafkaBinderTests
QueueChannel input1 = new QueueChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
//consumerProperties.getExtension().setAutoRebalanceEnabled(false);
Binding<MessageChannel> binding1 = binder.bindConsumer("defaultGroup.0", null, input1,
consumerProperties);
@@ -1203,7 +889,216 @@ public class KafkaBinderTests
binding2.unbind();
}
private static final class FailingInvocationCountingMessageHandler implements MessageHandler {
@Test
@SuppressWarnings("unchecked")
public void testSyncProducerMetadata() throws Exception {
Binder binder = getBinder(createConfigurationProperties());
DirectChannel output = new DirectChannel();
String testTopicName = UUID.randomUUID().toString();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.getExtension().setSync(true);
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, output, properties);
DirectFieldAccessor accessor = new DirectFieldAccessor(extractEndpoint(producerBinding));
KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor.getWrappedInstance();
assertThat(new DirectFieldAccessor(wrappedInstance).getPropertyValue("sync").equals(Boolean.TRUE))
.withFailMessage("Kafka Sync Producer should have been enabled.");
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testAutoCreateTopicsDisabledFailsIfTopicMissing() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
configurationProperties.setAutoCreateTopics(false);
Binder binder = getBinder(configurationProperties);
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
String testTopicName = "nonexisting" + System.currentTimeMillis();
try {
binder.bindConsumer(testTopicName, "test", output, consumerProperties);
fail();
}
catch (Exception e) {
assertThat(e).isInstanceOf(BinderException.class);
assertThat(e).hasMessageContaining("Topic " + testTopicName + " does not exist");
}
}
@Test
@SuppressWarnings("unchecked")
public void testAutoConfigureTopicsDisabledSucceedsIfTopicExisting() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkUtils zkUtils = getZkUtils(configurationProperties);
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(zkUtils, testTopicName, 5, 1, new Properties());
configurationProperties.setAutoCreateTopics(false);
Binder binder = getBinder(configurationProperties);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<MessageChannel> binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
binding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testPartitionCountIncreasedIfAutoAddPartitionsSet() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkUtils zkUtils = getZkUtils(configurationProperties);
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
configurationProperties.setMinPartitionCount(6);
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<?> binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
binding.unbind();
assertThat(invokePartitionSize(testTopicName, zkUtils)).isEqualTo(6);
}
@Test
@SuppressWarnings("unchecked")
public void testAutoAddPartitionsDisabledFailsIfTopicUnderpartitioned() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(zkUtils, testTopicName, 1, 1, new Properties());
configurationProperties.setAutoAddPartitions(false);
Binder binder = getBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
// binder.setApplicationContext(context);
// binder.afterPropertiesSet();
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
// this consumer must consume from partition 2
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(2);
Binding binding = null;
try {
binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
}
catch (Exception e) {
assertThat(e).isInstanceOf(BinderException.class);
assertThat(e)
.hasMessageContaining("The number of expected partitions was: 3, but 1 has been found instead");
}
finally {
if (binding != null) {
binding.unbind();
}
}
}
@Test
@SuppressWarnings("unchecked")
public void testAutoAddPartitionsDisabledSucceedsIfTopicPartitionedCorrectly() throws Exception {
Binding<?> binding = null;
try {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
configurationProperties.setAutoAddPartitions(false);
Binder binder = getBinder(configurationProperties);
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
// this consumer must consume from partition 2
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(2);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
binding = binder.bindConsumer(testTopicName, "test-x", output, consumerProperties);
TopicPartitionInitialOffset[] listenedPartitions = TestUtils.getPropertyValue(binding,
"endpoint.messageListenerContainer.containerProperties.topicPartitions",
TopicPartitionInitialOffset[].class);
assertThat(listenedPartitions).hasSize(2);
assertThat(listenedPartitions).contains(new TopicPartitionInitialOffset(testTopicName, 2),
new TopicPartitionInitialOffset(testTopicName, 5));
int partitions = invokePartitionSize(testTopicName, zkUtils);
assertThat(partitions).isEqualTo(6);
}
finally {
if (binding != null) {
binding.unbind();
}
}
}
@Test
@SuppressWarnings("unchecked")
public void testPartitionCountNotReduced() throws Exception {
String testTopicName = "existing" + System.currentTimeMillis();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient;
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<?> binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
binding.unbind();
assertThat(partitionSize(testTopicName)).isEqualTo(6);
}
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
Thread.sleep(500);
}
private final class FailingInvocationCountingMessageHandler implements MessageHandler {
private int invocationCount;
@@ -1222,7 +1117,7 @@ public class KafkaBinderTests
@Override
public void handleMessage(Message<?> message) throws MessagingException {
invocationCount++;
Long offset = message.getHeaders().get(KafkaHeaders.OFFSET, Long.class);
Long offset = message.getHeaders().get(KafkaBinderTests.this.getKafkaOffsetHeaderKey(), Long.class);
// using the offset as key allows to ensure that we don't store duplicate
// messages on retry
if (!receivedMessages.containsKey(offset)) {

View File

@@ -40,12 +40,12 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Gary Russell
* @author Mark Fisher
*/
public class RawModeKafkaBinderTests extends KafkaBinderTests {
public class RawModeKafka09BinderTests extends Kafka09BinderTests {
@Test
@Override
public void testPartitionedModuleJava() throws Exception {
KafkaTestBinder binder = getBinder();
Kafka09TestBinder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.setHeaderMode(HeaderMode.raw);
properties.setPartitionKeyExtractorClass(RawKafkaPartitionTestSupport.class);
@@ -98,7 +98,7 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
@Test
@Override
public void testPartitionedModuleSpEL() throws Exception {
KafkaTestBinder binder = getBinder();
Kafka09TestBinder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload[0]"));
properties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
@@ -160,7 +160,7 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
@Test
@Override
public void testSendAndReceive() throws Exception {
KafkaTestBinder binder = getBinder();
Kafka09TestBinder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
@@ -184,7 +184,7 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
@Test
public void testSendAndReceiveWithExplicitConsumerGroup() {
KafkaTestBinder binder = getBinder();
Kafka09TestBinder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
// Test pub/sub by emulating how StreamPlugin handles taps
QueueChannel module1InputChannel = new QueueChannel();