Compare commits
9 Commits
v1.1.0.M1
...
v1.1.0.RC2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b944b594b5 | ||
|
|
4fa567fec7 | ||
|
|
16d6584774 | ||
|
|
ea514bd72d | ||
|
|
79feac15d0 | ||
|
|
3a27a5ec75 | ||
|
|
7308bd4991 | ||
|
|
866eaf4a25 | ||
|
|
b37e7b37d2 |
53
pom.xml
53
pom.xml
@@ -2,12 +2,12 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.M1</version>
|
||||
<version>1.1.0.RC2</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-parent</artifactId>
|
||||
<version>1.1.0.M1</version>
|
||||
<version>1.1.0.RC1</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
@@ -19,8 +19,8 @@
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
<module>spring-cloud-starter-stream-kafka</module>
|
||||
<module>spring-cloud-stream-binder-kafka-test-support</module>
|
||||
<module>spring-cloud-stream-binder-kafka-docs</module>
|
||||
<module>spring-cloud-stream-binder-kafka-0.10-test</module>
|
||||
</modules>
|
||||
<build>
|
||||
<pluginManagement>
|
||||
@@ -59,6 +59,53 @@
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>${spring-integration-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<scope>test</scope>
|
||||
<version>${spring-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<classifier>test</classifier>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
<profiles>
|
||||
<profile>
|
||||
<id>spring</id>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.M1</version>
|
||||
<version>1.1.0.RC2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
@@ -20,7 +20,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>1.1.0.M1</version>
|
||||
<version>1.1.0.RC2</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
79
spring-cloud-stream-binder-kafka-0.10-test/pom.xml
Normal file
79
spring-cloud-stream-binder-kafka-0.10-test/pom.xml
Normal file
@@ -0,0 +1,79 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.RC2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-0.10-test</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder 0.10 Tests</description>
|
||||
<url>http://projects.spring.io/spring-cloud</url>
|
||||
<organization>
|
||||
<name>Pivotal Software, Inc.</name>
|
||||
<url>http://www.spring.io</url>
|
||||
</organization>
|
||||
<properties>
|
||||
<main.basedir>${basedir}/../..</main.basedir>
|
||||
<!--
|
||||
Override Kafka dependencies to Kafka 0.10 and supporting Spring Kafka and
|
||||
Spring Integration Kafka versions
|
||||
-->
|
||||
<kafka.version>0.10.0.0</kafka.version>
|
||||
<spring-kafka.version>1.1.0.M1</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.0.1.RELEASE</spring-integration-kafka.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>1.1.0.RC2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>1.1.0.RC2</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,179 @@
|
||||
/*
|
||||
* Copyright 2014-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.Spy;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.test.core.BrokerAddress;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
|
||||
/**
|
||||
* Integration tests for the {@link KafkaMessageChannelBinder}.
|
||||
*
|
||||
* @author Eric Bottard
|
||||
* @author Marius Bogoevici
|
||||
* @author Mark Fisher
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class Kafka10BinderTests extends KafkaBinderTests {
|
||||
|
||||
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
|
||||
|
||||
private Kafka10TestBinder binder;
|
||||
|
||||
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Kafka10TestBinder getBinder() {
|
||||
if (binder == null) {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binder = new Kafka10TestBinder(binderConfiguration);
|
||||
}
|
||||
return binder;
|
||||
}
|
||||
|
||||
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
|
||||
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
|
||||
List<String> bAddresses = new ArrayList<>();
|
||||
for (BrokerAddress bAddress : brokerAddresses) {
|
||||
bAddresses.add(bAddress.toString());
|
||||
}
|
||||
String[] foo = new String[bAddresses.size()];
|
||||
binderConfiguration.setBrokers(bAddresses.toArray(foo));
|
||||
binderConfiguration.setZkNodes(embeddedKafka.getZookeeperConnectionString());
|
||||
return binderConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int partitionSize(String topic) {
|
||||
return consumerFactory().createConsumer().partitionsFor(topic).size();
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void setMetadataRetryOperations(Binder binder, RetryOperations retryOperations) {
|
||||
((Kafka10TestBinder) binder).getBinder().setMetadataRetryOperations(retryOperations);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
|
||||
kafkaBinderConfigurationProperties.getZkSessionTimeout(), kafkaBinderConfigurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
return new ZkUtils(zkClient, null, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Properties topicConfig) {
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topic, partitions, replicationFactor, new Properties());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int invokePartitionSize(String topic, ZkUtils zkUtils) {
|
||||
return adminUtilsOperation.partitionSize(topic, zkUtils);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
|
||||
return new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
|
||||
return new ExtendedProducerProperties<>(new KafkaProducerProperties());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKafkaOffsetHeaderKey() {
|
||||
return KafkaHeaders.OFFSET;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
return new Kafka10TestBinder(kafkaBinderConfigurationProperties);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
|
||||
if (multiplier != null) {
|
||||
timeoutMultiplier = Double.parseDouble(multiplier);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean usesExplicitRouting() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getClassUnderTestName() {
|
||||
return CLASS_UNDER_TEST_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Spy spyOn(final String name) {
|
||||
throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
|
||||
}
|
||||
|
||||
|
||||
private ConsumerFactory<byte[], byte[]> consumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST-CONSUMER-GROUP");
|
||||
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
|
||||
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
|
||||
|
||||
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -16,20 +16,9 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo;
|
||||
import com.esotericsoftware.kryo.Registration;
|
||||
|
||||
import org.springframework.cloud.stream.binder.AbstractTestBinder;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.integration.codec.Codec;
|
||||
import org.springframework.integration.codec.kryo.KryoRegistrar;
|
||||
import org.springframework.integration.codec.kryo.PojoCodec;
|
||||
import org.springframework.integration.tuple.TupleKryoRegistrar;
|
||||
import org.springframework.kafka.support.LoggingProducerListener;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
|
||||
@@ -41,10 +30,9 @@ import org.springframework.kafka.support.ProducerListener;
|
||||
* @author Gary Russell
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class KafkaTestBinder extends
|
||||
AbstractTestBinder<KafkaMessageChannelBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
|
||||
public class Kafka10TestBinder extends AbstractKafkaTestBinder {
|
||||
|
||||
public KafkaTestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
|
||||
public Kafka10TestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
|
||||
try {
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration);
|
||||
binder.setCodec(getCodec());
|
||||
@@ -53,6 +41,7 @@ public class KafkaTestBinder extends
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.setAdminUtilsOperation(new Kafka10AdminUtilsOperation());
|
||||
binder.afterPropertiesSet();
|
||||
this.setBinder(binder);
|
||||
}
|
||||
@@ -61,27 +50,4 @@ public class KafkaTestBinder extends
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
// do nothing - the rule will take care of that
|
||||
}
|
||||
|
||||
private static Codec getCodec() {
|
||||
return new PojoCodec(new TupleRegistrar());
|
||||
}
|
||||
|
||||
private static class TupleRegistrar implements KryoRegistrar {
|
||||
private final TupleKryoRegistrar delegate = new TupleKryoRegistrar();
|
||||
|
||||
@Override
|
||||
public void registerTypes(Kryo kryo) {
|
||||
this.delegate.registerTypes(kryo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Registration> getRegistrations() {
|
||||
return this.delegate.getRegistrations();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.M1</version>
|
||||
<version>1.1.0.RC2</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
|
||||
@@ -18,7 +18,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>1.1.0.M1</version>
|
||||
<version>1.1.0.RC2</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<profiles>
|
||||
|
||||
@@ -120,6 +120,13 @@ Default: `2097152`.
|
||||
The following properties are available for Kafka consumers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
|
||||
|
||||
autoRebalanceEnabled::
|
||||
When `true`, topic partitions will be automatically rebalanced between the members of a consumer group.
|
||||
When `false`, each consumer will be assigned a fixed set of partitions based on `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex`.
|
||||
This requires both `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex` properties to be set appropriately on each launched instance.
|
||||
The property `spring.cloud.stream.instanceCount` must typically be greater than 1 in this case.
|
||||
+
|
||||
Default: `true`.
|
||||
autoCommitOffset::
|
||||
Whether to autocommit offsets when a message has been processed.
|
||||
If set to `false`, an `Acknowledgment` header will be available in the message headers for late acknowledgment.
|
||||
@@ -226,4 +233,41 @@ Here is an example of launching a Spring Cloud Stream application with SASL and
|
||||
Exercise caution when using the `autoCreateTopics` and `autoAddPartitions` if using Kerberos.
|
||||
Usually applications may use principals that do not have administrative rights in Kafka and Zookeeper, and relying on Spring Cloud Stream to create/modify topics may fail.
|
||||
In secure environments, we strongly recommend creating topics and managing ACLs administratively using Kafka tooling.
|
||||
====
|
||||
====
|
||||
|
||||
==== Using the binder with Apache Kafka 0.10
|
||||
|
||||
The binder also supports connecting to Kafka 0.10 brokers.
|
||||
In order to support this, when you create the project that contains your application, include `spring-cloud-starter-stream-kafka` as you normally would do for 0.9 based applications.
|
||||
Then add these dependencies at the top of the `<dependencies>` section in the pom.xml file to override the Apache Kafka, Spring Kafka, and Spring Integration Kafka with 0.10-compatible versions as in the following example:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>1.1.0.M1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>2.0.1.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>0.10.0.0</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
The versions above are provided only for the sake of the example.
|
||||
For best results, we recommend using the most recent 0.10-compatible versions of the projects.
|
||||
====
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
|
||||
<description>Kafka related test classes</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.1.0.M1</version>
|
||||
<version>1.1.0.RC2</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
@@ -37,12 +37,6 @@
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
|
||||
<scope>test</scope>
|
||||
<version>1.1.0.M1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
@@ -54,17 +48,6 @@
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<scope>test</scope>
|
||||
<version>${spring-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
@@ -73,6 +56,21 @@
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>${spring-integration-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
@@ -81,34 +79,20 @@
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<classifier>test</classifier>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>3.0.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
||||
@@ -33,6 +33,7 @@ import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigura
|
||||
|
||||
/**
|
||||
* Health indicator for Kafka.
|
||||
*
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
@@ -43,7 +44,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
private final KafkaBinderConfigurationProperties configurationProperties;
|
||||
|
||||
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
|
||||
KafkaBinderConfigurationProperties configurationProperties) {
|
||||
KafkaBinderConfigurationProperties configurationProperties) {
|
||||
this.binder = binder;
|
||||
this.configurationProperties = configurationProperties;
|
||||
|
||||
|
||||
@@ -25,8 +25,6 @@ import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
import kafka.admin.AdminUtils;
|
||||
import kafka.api.TopicMetadata;
|
||||
import kafka.common.ErrorMapping;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
@@ -42,7 +40,6 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import scala.collection.Seq;
|
||||
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||
@@ -52,6 +49,7 @@ import org.springframework.cloud.stream.binder.BinderHeaders;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.context.Lifecycle;
|
||||
import org.springframework.expression.common.LiteralExpression;
|
||||
@@ -83,6 +81,7 @@ import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* A {@link Binder} that uses Kafka as the underlying middleware.
|
||||
*
|
||||
* @author Eric Bottard
|
||||
* @author Marius Bogoevici
|
||||
* @author Ilayaperumal Gopinathan
|
||||
@@ -109,6 +108,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
|
||||
|
||||
private AdminUtilsOperation adminUtilsOperation;
|
||||
|
||||
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties) {
|
||||
super(false, headersToMap(configurationProperties));
|
||||
this.configurationProperties = configurationProperties;
|
||||
@@ -130,8 +131,13 @@ public class KafkaMessageChannelBinder extends
|
||||
return headersToMap;
|
||||
}
|
||||
|
||||
public void setAdminUtilsOperation(AdminUtilsOperation adminUtilsOperation) {
|
||||
this.adminUtilsOperation = adminUtilsOperation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry configuration for operations such as validating topic creation
|
||||
*
|
||||
* @param metadataRetryOperations the retry configuration
|
||||
*/
|
||||
public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
|
||||
@@ -189,7 +195,7 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
@Override
|
||||
protected MessageHandler createProducerMessageHandler(final String destination,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
|
||||
|
||||
KafkaTopicUtils.validateTopicName(destination);
|
||||
|
||||
@@ -215,11 +221,12 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
@Override
|
||||
protected String createProducerDestinationIfNecessary(String name,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties) {
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("Using kafka topic for outbound: " + name);
|
||||
}
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
|
||||
Collection<PartitionInfo> partitions = ensureTopicCreated(name, properties.getPartitionCount());
|
||||
if (properties.getPartitionCount() < partitions.size()) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
@@ -259,17 +266,19 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
@Override
|
||||
protected Collection<PartitionInfo> createConsumerDestinationIfNecessary(String name, String group,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
if (properties.getInstanceCount() == 0) {
|
||||
throw new IllegalArgumentException("Instance count cannot be zero");
|
||||
}
|
||||
|
||||
Collection<PartitionInfo> allPartitions = ensureTopicCreated(name,
|
||||
properties.getInstanceCount() * properties.getConcurrency());
|
||||
|
||||
Collection<PartitionInfo> listenedPartitions;
|
||||
|
||||
if (properties.getInstanceCount() == 1) {
|
||||
if (properties.getExtension().isAutoRebalanceEnabled() ||
|
||||
properties.getInstanceCount() == 1) {
|
||||
listenedPartitions = allPartitions;
|
||||
}
|
||||
else {
|
||||
@@ -288,7 +297,7 @@ public class KafkaMessageChannelBinder extends
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected MessageProducer createConsumerEndpoint(String name, String group, Collection<PartitionInfo> destination,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
boolean anonymous = !StringUtils.hasText(group);
|
||||
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
|
||||
"DLQ support is not available for anonymous subscriptions");
|
||||
@@ -429,38 +438,38 @@ public class KafkaMessageChannelBinder extends
|
||||
this.configurationProperties.getZkConnectionTimeout(),
|
||||
JaasUtils.isZkSecurityEnabled());
|
||||
try {
|
||||
final Properties topicConfig = new Properties();
|
||||
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
|
||||
if (topicMetadata.errorCode() == ErrorMapping.NoError()) {
|
||||
// only consider minPartitionCount for resizing if autoAddPartitions is
|
||||
// true
|
||||
short errorCode = adminUtilsOperation.errorCodeFromTopicMetadata(topicName, zkUtils);
|
||||
if (errorCode == ErrorMapping.NoError()) {
|
||||
// only consider minPartitionCount for resizing if autoAddPartitions is true
|
||||
int effectivePartitionCount = this.configurationProperties.isAutoAddPartitions()
|
||||
? Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount)
|
||||
: partitionCount;
|
||||
if (topicMetadata.partitionsMetadata().size() < effectivePartitionCount) {
|
||||
int partitionSize = adminUtilsOperation.partitionSize(topicName, zkUtils);
|
||||
|
||||
if (partitionSize < effectivePartitionCount) {
|
||||
if (this.configurationProperties.isAutoAddPartitions()) {
|
||||
AdminUtils.addPartitions(zkUtils, topicName, effectivePartitionCount, null, false);
|
||||
adminUtilsOperation.invokeAddPartitions(zkUtils, topicName, effectivePartitionCount, null, false);
|
||||
}
|
||||
else {
|
||||
int topicSize = topicMetadata.partitionsMetadata().size();
|
||||
throw new BinderException("The number of expected partitions was: " + partitionCount + ", but "
|
||||
+ topicSize + (topicSize > 1 ? " have " : " has ") + "been found instead."
|
||||
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
|
||||
+ "Consider either increasing the partition count of the topic or enabling " +
|
||||
"`autoAddPartitions`");
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (topicMetadata.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) {
|
||||
else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
|
||||
if (this.configurationProperties.isAutoCreateTopics()) {
|
||||
Seq<Object> brokerList = zkUtils.getSortedBrokerList();
|
||||
// always consider minPartitionCount for topic creation
|
||||
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
|
||||
partitionCount);
|
||||
|
||||
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
|
||||
|
||||
@Override
|
||||
public Object doWithRetry(RetryContext context) throws RuntimeException {
|
||||
AdminUtils.createTopic(zkUtils, topicName, effectivePartitionCount,
|
||||
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
|
||||
configurationProperties.getReplicationFactor(), new Properties());
|
||||
return null;
|
||||
}
|
||||
@@ -472,7 +481,7 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
else {
|
||||
throw new BinderException("Error fetching Kafka topic metadata: ",
|
||||
ErrorMapping.exceptionFor(topicMetadata.errorCode()));
|
||||
ErrorMapping.exceptionFor(errorCode));
|
||||
}
|
||||
try {
|
||||
return this.metadataRetryOperations
|
||||
@@ -549,8 +558,8 @@ public class KafkaMessageChannelBinder extends
|
||||
private final DefaultKafkaProducerFactory<byte[], byte[]> producerFactory;
|
||||
|
||||
private ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
|
||||
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
|
||||
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
|
||||
super(kafkaTemplate);
|
||||
setTopicExpression(new LiteralExpression(topic));
|
||||
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
|
||||
|
||||
@@ -0,0 +1,75 @@
|
||||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.admin;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
/**
|
||||
* API around {@link kafka.admin.AdminUtils} to support
|
||||
* various versions of Kafka brokers.
|
||||
*
|
||||
* Note: Implementations that support Kafka brokers other than 0.9, need to use
|
||||
* a possible strategy that involves reflection around {@link kafka.admin.AdminUtils}.
|
||||
*
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public interface AdminUtilsOperation {
|
||||
|
||||
/**
|
||||
* Invoke {@link kafka.admin.AdminUtils#addPartitions}
|
||||
*
|
||||
* @param zkUtils Zookeeper utils
|
||||
* @param topic name of the topic
|
||||
* @param numPartitions
|
||||
* @param replicaAssignmentStr
|
||||
* @param checkBrokerAvailable
|
||||
*/
|
||||
void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions,
|
||||
String replicaAssignmentStr, boolean checkBrokerAvailable);
|
||||
|
||||
/**
|
||||
* Invoke {@link kafka.admin.AdminUtils#fetchTopicMetadataFromZk}
|
||||
*
|
||||
* @param topic name
|
||||
* @param zkUtils zookeeper utils
|
||||
* @return error code
|
||||
*/
|
||||
short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils);
|
||||
|
||||
/**
|
||||
* Find partition size from Kafka broker using {@link kafka.admin.AdminUtils}
|
||||
*
|
||||
* @param topic name
|
||||
* @param zkUtils zookeeper utils
|
||||
* @return partition size
|
||||
*/
|
||||
int partitionSize(String topic, ZkUtils zkUtils);
|
||||
|
||||
/**
|
||||
* Inovke {@link kafka.admin.AdminUtils#createTopic}
|
||||
*
|
||||
* @param zkUtils zookeeper utils
|
||||
* @param topic name
|
||||
* @param partitions
|
||||
* @param replicationFactor
|
||||
* @param topicConfig
|
||||
*/
|
||||
void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
|
||||
int replicationFactor, Properties topicConfig);
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.admin;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import kafka.admin.AdminUtils;
|
||||
import kafka.api.TopicMetadata;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
|
||||
|
||||
public void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions,
|
||||
String replicaAssignmentStr, boolean checkBrokerAvailable) {
|
||||
AdminUtils.addPartitions(zkUtils, topic, numPartitions,
|
||||
replicaAssignmentStr, checkBrokerAvailable);
|
||||
}
|
||||
|
||||
public short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils) {
|
||||
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
|
||||
return topicMetadata.errorCode();
|
||||
}
|
||||
|
||||
public int partitionSize(String topic, ZkUtils zkUtils) {
|
||||
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
|
||||
return topicMetadata.partitionsMetadata().size();
|
||||
}
|
||||
|
||||
public void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
|
||||
int replicationFactor, Properties topicConfig) {
|
||||
AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor,
|
||||
topicConfig);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,151 @@
|
||||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.admin;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import kafka.api.PartitionMetadata;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
|
||||
private static ClassLoader CLASS_LOADER = Kafka10AdminUtilsOperation.class.getClassLoader();
|
||||
|
||||
private static Class<?> ADMIN_UTIL_CLASS;
|
||||
|
||||
static {
|
||||
try {
|
||||
ADMIN_UTIL_CLASS = CLASS_LOADER.loadClass("kafka.admin.AdminUtils");
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils class not found", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions,
|
||||
String replicaAssignmentStr, boolean checkBrokerAvailable) {
|
||||
try {
|
||||
Method[] declaredMethods = ADMIN_UTIL_CLASS.getDeclaredMethods();
|
||||
Method addPartitions = null;
|
||||
for (Method m : declaredMethods) {
|
||||
if (m.getName().equals("addPartitions")) {
|
||||
addPartitions = m;
|
||||
}
|
||||
}
|
||||
|
||||
if (addPartitions != null) {
|
||||
addPartitions.invoke(null, zkUtils, topic, numPartitions,
|
||||
replicaAssignmentStr, checkBrokerAvailable, null);
|
||||
}
|
||||
else {
|
||||
throw new InvocationTargetException(
|
||||
new RuntimeException("method not found"));
|
||||
}
|
||||
}
|
||||
catch (InvocationTargetException e) {
|
||||
ReflectionUtils.handleInvocationTargetException(e);
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
ReflectionUtils.handleReflectionException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils) {
|
||||
try {
|
||||
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
|
||||
|
||||
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
|
||||
Class<?> topicMetadataClass = CLASS_LOADER.loadClass("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata");
|
||||
|
||||
Method errorCodeMethod = ReflectionUtils.findMethod(topicMetadataClass, "error");
|
||||
Object obj = errorCodeMethod.invoke(result);
|
||||
Method code = ReflectionUtils.findMethod(obj.getClass(), "code");
|
||||
|
||||
return (short) code.invoke(obj);
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils class not found", e);
|
||||
}
|
||||
catch (InvocationTargetException e) {
|
||||
ReflectionUtils.handleInvocationTargetException(e);
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
ReflectionUtils.handleReflectionException(e);
|
||||
}
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public int partitionSize(String topic, ZkUtils zkUtils) {
|
||||
try {
|
||||
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
|
||||
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
|
||||
Class<?> topicMetadataClass = CLASS_LOADER.loadClass("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata");
|
||||
|
||||
Method partitionsMetadata = ReflectionUtils.findMethod(topicMetadataClass, "partitionMetadata");
|
||||
List<PartitionMetadata> foo = (List<PartitionMetadata>) partitionsMetadata.invoke(result);
|
||||
return foo.size();
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils class not found", e);
|
||||
}
|
||||
catch (InvocationTargetException e) {
|
||||
ReflectionUtils.handleInvocationTargetException(e);
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
ReflectionUtils.handleReflectionException(e);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
|
||||
int replicationFactor, Properties topicConfig) {
|
||||
try {
|
||||
Method[] declaredMethods = ADMIN_UTIL_CLASS.getDeclaredMethods();
|
||||
Method createTopic = null;
|
||||
for (Method m : declaredMethods) {
|
||||
if (m.getName().equals("createTopic") && m.getParameterTypes()[m.getParameterTypes().length - 1].getName().endsWith("RackAwareMode")) {
|
||||
createTopic = m;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (createTopic != null) {
|
||||
createTopic.invoke(null, zkUtils, topic, partitions,
|
||||
replicationFactor, topicConfig, null);
|
||||
}
|
||||
else {
|
||||
throw new InvocationTargetException(
|
||||
new RuntimeException("method not found"));
|
||||
}
|
||||
}
|
||||
catch (InvocationTargetException e) {
|
||||
ReflectionUtils.handleInvocationTargetException(e);
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
ReflectionUtils.handleReflectionException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,11 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.config;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.PropertyPlaceholderAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
@@ -24,10 +29,18 @@ import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.config.codec.kryo.KryoCodecAutoConfiguration;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Condition;
|
||||
import org.springframework.context.annotation.ConditionContext;
|
||||
import org.springframework.context.annotation.Conditional;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.core.type.AnnotatedTypeMetadata;
|
||||
import org.springframework.integration.codec.Codec;
|
||||
import org.springframework.kafka.support.LoggingProducerListener;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
@@ -45,6 +58,8 @@ import org.springframework.kafka.support.ProducerListener;
|
||||
@EnableConfigurationProperties({KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class})
|
||||
public class KafkaBinderConfiguration {
|
||||
|
||||
protected final Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
@Autowired
|
||||
private Codec codec;
|
||||
|
||||
@@ -57,6 +72,9 @@ public class KafkaBinderConfiguration {
|
||||
@Autowired
|
||||
private ProducerListener producerListener;
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext context;
|
||||
|
||||
@Bean
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder() {
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
|
||||
@@ -64,6 +82,8 @@ public class KafkaBinderConfiguration {
|
||||
kafkaMessageChannelBinder.setCodec(this.codec);
|
||||
//kafkaMessageChannelBinder.setProducerListener(producerListener);
|
||||
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
|
||||
AdminUtilsOperation adminUtilsOperation = context.getBean(AdminUtilsOperation.class);
|
||||
kafkaMessageChannelBinder.setAdminUtilsOperation(adminUtilsOperation);
|
||||
return kafkaMessageChannelBinder;
|
||||
}
|
||||
|
||||
@@ -77,4 +97,69 @@ public class KafkaBinderConfiguration {
|
||||
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
|
||||
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, this.configurationProperties);
|
||||
}
|
||||
|
||||
@Bean(name = "adminUtilsOperation")
|
||||
@Conditional(Kafka09Condition.class)
|
||||
public AdminUtilsOperation kafka09AdminUtilsOperation() {
|
||||
logger.info("AdminUtils selected: Kafka 0.9 AdminUtils");
|
||||
return new Kafka09AdminUtilsOperation();
|
||||
}
|
||||
|
||||
@Bean(name = "adminUtilsOperation")
|
||||
@Conditional(Kafka10Condition.class)
|
||||
public AdminUtilsOperation kafka10AdminUtilsOperation() {
|
||||
logger.info("AdminUtils selected: Kafka 0.10 AdminUtils");
|
||||
return new Kafka10AdminUtilsOperation();
|
||||
}
|
||||
|
||||
private static Method getMethod(ClassLoader classLoader, String methodName) {
|
||||
try {
|
||||
Class<?> adminUtilClass = classLoader.loadClass("kafka.admin.AdminUtils");
|
||||
Method[] declaredMethods = adminUtilClass.getDeclaredMethods();
|
||||
for (Method m : declaredMethods) {
|
||||
if (m.getName().equals(methodName)) {
|
||||
return m;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils not found", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
static class Kafka10Condition implements Condition {
|
||||
|
||||
@Override
|
||||
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
|
||||
ClassLoader classLoader = Kafka10Condition.class.getClassLoader();
|
||||
Method addPartitions = getMethod(classLoader, "addPartitions");
|
||||
if (addPartitions != null) {
|
||||
Class<?>[] parameterTypes = addPartitions.getParameterTypes();
|
||||
Class clazz = parameterTypes[parameterTypes.length - 1];
|
||||
if (clazz.getName().equals("kafka.admin.RackAwareMode")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
static class Kafka09Condition implements Condition {
|
||||
|
||||
@Override
|
||||
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
|
||||
|
||||
ClassLoader classLoader = Kafka09Condition.class.getClassLoader();
|
||||
Method addPartitions = getMethod(classLoader, "addPartitions");
|
||||
if (addPartitions != null) {
|
||||
Class<?>[] parameterTypes = addPartitions.getParameterTypes();
|
||||
Class clazz = parameterTypes[parameterTypes.length - 1];
|
||||
if (!clazz.getName().equals("kafka.admin.RackAwareMode")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo;
|
||||
import com.esotericsoftware.kryo.Registration;
|
||||
|
||||
import org.springframework.cloud.stream.binder.AbstractTestBinder;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.integration.codec.Codec;
|
||||
import org.springframework.integration.codec.kryo.KryoRegistrar;
|
||||
import org.springframework.integration.codec.kryo.PojoCodec;
|
||||
import org.springframework.integration.tuple.TupleKryoRegistrar;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public abstract class AbstractKafkaTestBinder extends
|
||||
AbstractTestBinder<KafkaMessageChannelBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
// do nothing - the rule will take care of that
|
||||
}
|
||||
|
||||
protected static Codec getCodec() {
|
||||
return new PojoCodec(new TupleRegistrar());
|
||||
}
|
||||
|
||||
private static class TupleRegistrar implements KryoRegistrar {
|
||||
private final TupleKryoRegistrar delegate = new TupleKryoRegistrar();
|
||||
|
||||
@Override
|
||||
public void registerTypes(Kryo kryo) {
|
||||
this.delegate.registerTypes(kryo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Registration> getRegistrations() {
|
||||
return this.delegate.getRegistrations();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,176 @@
|
||||
/*
|
||||
* Copyright 2014-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.Spy;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.test.core.BrokerAddress;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
|
||||
/**
|
||||
* Integration tests for the {@link KafkaMessageChannelBinder}.
|
||||
*
|
||||
* @author Eric Bottard
|
||||
* @author Marius Bogoevici
|
||||
* @author Mark Fisher
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class Kafka09BinderTests extends KafkaBinderTests {
|
||||
|
||||
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
|
||||
|
||||
private Kafka09TestBinder binder;
|
||||
|
||||
private Kafka09AdminUtilsOperation adminUtilsOperation = new Kafka09AdminUtilsOperation();
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Kafka09TestBinder getBinder() {
|
||||
if (binder == null) {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binder = new Kafka09TestBinder(binderConfiguration);
|
||||
}
|
||||
return binder;
|
||||
}
|
||||
|
||||
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
|
||||
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
|
||||
List<String> bAddresses = new ArrayList<>();
|
||||
for (BrokerAddress bAddress : brokerAddresses) {
|
||||
bAddresses.add(bAddress.toString());
|
||||
}
|
||||
String[] foo = new String[bAddresses.size()];
|
||||
binderConfiguration.setBrokers(bAddresses.toArray(foo));
|
||||
binderConfiguration.setZkNodes(embeddedKafka.getZookeeperConnectionString());
|
||||
return binderConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int partitionSize(String topic) {
|
||||
return consumerFactory().createConsumer().partitionsFor(topic).size();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setMetadataRetryOperations(Binder binder, RetryOperations retryOperations) {
|
||||
((Kafka09TestBinder) binder).getBinder().setMetadataRetryOperations(retryOperations);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
|
||||
kafkaBinderConfigurationProperties.getZkSessionTimeout(), kafkaBinderConfigurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
return new ZkUtils(zkClient, null, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Properties topicConfig) {
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topic, partitions, replicationFactor, new Properties());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int invokePartitionSize(String topic, ZkUtils zkUtils) {
|
||||
return adminUtilsOperation.partitionSize(topic, zkUtils);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
|
||||
return new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
|
||||
return new ExtendedProducerProperties<>(new KafkaProducerProperties());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKafkaOffsetHeaderKey() {
|
||||
return KafkaHeaders.OFFSET;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
return new Kafka09TestBinder(kafkaBinderConfigurationProperties);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
|
||||
if (multiplier != null) {
|
||||
timeoutMultiplier = Double.parseDouble(multiplier);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean usesExplicitRouting() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getClassUnderTestName() {
|
||||
return CLASS_UNDER_TEST_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Spy spyOn(final String name) {
|
||||
throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
|
||||
}
|
||||
|
||||
private ConsumerFactory<byte[], byte[]> consumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
|
||||
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
|
||||
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
|
||||
|
||||
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Copyright 2015-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.kafka.support.LoggingProducerListener;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
|
||||
/**
|
||||
* Test support class for {@link KafkaMessageChannelBinder}. Creates a binder that uses
|
||||
* an embedded Kafka cluster.
|
||||
* @author Eric Bottard
|
||||
* @author Marius Bogoevici
|
||||
* @author David Turanski
|
||||
* @author Gary Russell
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class Kafka09TestBinder extends AbstractKafkaTestBinder {
|
||||
|
||||
public Kafka09TestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
|
||||
try {
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration);
|
||||
binder.setCodec(getCodec());
|
||||
ProducerListener producerListener = new LoggingProducerListener();
|
||||
binder.setProducerListener(producerListener);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.setAdminUtilsOperation(new Kafka09AdminUtilsOperation());
|
||||
binder.afterPropertiesSet();
|
||||
this.setBinder(binder);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2014-2016 the original author or authors.
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,30 +16,18 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import kafka.admin.AdminUtils;
|
||||
import kafka.api.TopicMetadata;
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.assertj.core.api.Condition;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
@@ -51,7 +39,6 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
|
||||
import org.springframework.cloud.stream.binder.PartitionTestSupport;
|
||||
import org.springframework.cloud.stream.binder.Spy;
|
||||
import org.springframework.cloud.stream.binder.TestUtils;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.config.BindingProperties;
|
||||
@@ -60,18 +47,14 @@ import org.springframework.integration.IntegrationMessageHeaderAccessor;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.TopicPartitionInitialOffset;
|
||||
import org.springframework.kafka.test.core.BrokerAddress;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
import org.springframework.retry.backoff.FixedBackOffPolicy;
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
@@ -80,101 +63,37 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* Integration tests for the {@link KafkaMessageChannelBinder}.
|
||||
* @author Eric Bottard
|
||||
* @author Marius Bogoevici
|
||||
* @author Mark Fisher
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class KafkaBinderTests
|
||||
extends
|
||||
PartitionCapableBinderTests<KafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>> {
|
||||
public abstract class KafkaBinderTests extends PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>> {
|
||||
|
||||
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
|
||||
protected abstract ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties();
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
|
||||
protected abstract ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties();
|
||||
|
||||
private KafkaTestBinder binder;
|
||||
public abstract String getKafkaOffsetHeaderKey();
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
protected abstract Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties);
|
||||
|
||||
@Override
|
||||
protected KafkaTestBinder getBinder() {
|
||||
if (binder == null) {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binder = new KafkaTestBinder(binderConfiguration);
|
||||
}
|
||||
return binder;
|
||||
}
|
||||
protected abstract KafkaBinderConfigurationProperties createConfigurationProperties();
|
||||
|
||||
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
|
||||
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
|
||||
List<String> bAddresses = new ArrayList<>();
|
||||
for (BrokerAddress bAddress : brokerAddresses) {
|
||||
bAddresses.add(bAddress.toString());
|
||||
}
|
||||
String[] foo = new String[bAddresses.size()];
|
||||
binderConfiguration.setBrokers(bAddresses.toArray(foo));
|
||||
binderConfiguration.setZkNodes(embeddedKafka.getZookeeperConnectionString());
|
||||
return binderConfiguration;
|
||||
}
|
||||
protected abstract int partitionSize(String topic);
|
||||
|
||||
@Override
|
||||
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
|
||||
return new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
|
||||
}
|
||||
protected abstract void setMetadataRetryOperations(Binder binder, RetryOperations retryOperations);
|
||||
|
||||
@Override
|
||||
protected ExtendedProducerProperties<KafkaProducerProperties> createProducerProperties() {
|
||||
return new ExtendedProducerProperties<>(new KafkaProducerProperties());
|
||||
}
|
||||
protected abstract ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties);
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
|
||||
if (multiplier != null) {
|
||||
timeoutMultiplier = Double.parseDouble(multiplier);
|
||||
}
|
||||
}
|
||||
protected abstract void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
|
||||
int replicationFactor, Properties topicConfig);
|
||||
|
||||
@Override
|
||||
protected boolean usesExplicitRouting() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getClassUnderTestName() {
|
||||
return CLASS_UNDER_TEST_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Spy spyOn(final String name) {
|
||||
throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
|
||||
}
|
||||
|
||||
|
||||
private ConsumerFactory<byte[], byte[]> consumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST-CONSUMER-GROUP");
|
||||
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
|
||||
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
|
||||
|
||||
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
|
||||
|
||||
}
|
||||
protected abstract int invokePartitionSize(String topic,
|
||||
ZkUtils zkUtils);
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDlqAndRetry() throws Exception {
|
||||
KafkaTestBinder binder = getBinder();
|
||||
Binder binder = getBinder();
|
||||
DirectChannel moduleOutputChannel = new DirectChannel();
|
||||
DirectChannel moduleInputChannel = new DirectChannel();
|
||||
QueueChannel dlqChannel = new QueueChannel();
|
||||
@@ -216,8 +135,9 @@ public class KafkaBinderTests
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDefaultAutoCommitOnErrorWithoutDlq() throws Exception {
|
||||
KafkaTestBinder binder = getBinder();
|
||||
Binder binder = getBinder();
|
||||
DirectChannel moduleOutputChannel = new DirectChannel();
|
||||
DirectChannel moduleInputChannel = new DirectChannel();
|
||||
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
|
||||
@@ -266,8 +186,9 @@ public class KafkaBinderTests
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDefaultAutoCommitOnErrorWithDlq() throws Exception {
|
||||
KafkaTestBinder binder = getBinder();
|
||||
Binder binder = getBinder();
|
||||
DirectChannel moduleOutputChannel = new DirectChannel();
|
||||
DirectChannel moduleInputChannel = new DirectChannel();
|
||||
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
|
||||
@@ -325,19 +246,39 @@ public class KafkaBinderTests
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoCreateTopicsEnabledSucceeds() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
configurationProperties.setAutoCreateTopics(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
|
||||
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
String testTopicName = "nonexisting" + System.currentTimeMillis();
|
||||
Binding<?> binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
binding.unbind();
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testValidateKafkaTopicName() {
|
||||
KafkaTopicUtils.validateTopicName("foo:bar");
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCompression() throws Exception {
|
||||
final KafkaProducerProperties.CompressionType[] codecs = new KafkaProducerProperties.CompressionType[] {
|
||||
final KafkaProducerProperties.CompressionType[] codecs = new KafkaProducerProperties.CompressionType[]{
|
||||
KafkaProducerProperties.CompressionType.none, KafkaProducerProperties.CompressionType.gzip,
|
||||
KafkaProducerProperties.CompressionType.snappy};
|
||||
byte[] testPayload = new byte[2048];
|
||||
Arrays.fill(testPayload, (byte) 65);
|
||||
KafkaTestBinder binder = getBinder();
|
||||
Binder binder = getBinder();
|
||||
for (KafkaProducerProperties.CompressionType codec : codecs) {
|
||||
DirectChannel moduleOutputChannel = new DirectChannel();
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
@@ -364,13 +305,14 @@ public class KafkaBinderTests
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCustomPartitionCountOverridesDefaultIfLarger() throws Exception {
|
||||
|
||||
byte[] testPayload = new byte[2048];
|
||||
Arrays.fill(testPayload, (byte) 65);
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binderConfiguration.setMinPartitionCount(10);
|
||||
KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration);
|
||||
Binder binder = getBinder(binderConfiguration);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.setPartitionCount(10);
|
||||
@@ -393,23 +335,20 @@ public class KafkaBinderTests
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat((byte[]) inbound.getPayload()).containsExactly(testPayload);
|
||||
|
||||
|
||||
Collection<PartitionInfo> partitions =
|
||||
consumerFactory().createConsumer().partitionsFor("foo" + uniqueBindingId + ".0");
|
||||
|
||||
assertThat(partitions).hasSize(10);
|
||||
assertThat(partitionSize("foo" + uniqueBindingId + ".0")).isEqualTo(10);
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCustomPartitionCountDoesNotOverridePartitioningIfSmaller() throws Exception {
|
||||
|
||||
byte[] testPayload = new byte[2048];
|
||||
Arrays.fill(testPayload, (byte) 65);
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binderConfiguration.setMinPartitionCount(6);
|
||||
KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration);
|
||||
Binder binder = getBinder(binderConfiguration);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.setPartitionCount(5);
|
||||
@@ -431,22 +370,21 @@ public class KafkaBinderTests
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat((byte[]) inbound.getPayload()).containsExactly(testPayload);
|
||||
Collection<PartitionInfo> partitions =
|
||||
consumerFactory().createConsumer().partitionsFor("foo" + uniqueBindingId + ".0");
|
||||
|
||||
assertThat(partitions).hasSize(6);
|
||||
assertThat(partitionSize("foo" + uniqueBindingId + ".0")).isEqualTo(6);
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCustomPartitionCountOverridesPartitioningIfLarger() throws Exception {
|
||||
|
||||
byte[] testPayload = new byte[2048];
|
||||
Arrays.fill(testPayload, (byte) 65);
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binderConfiguration.setMinPartitionCount(4);
|
||||
KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration);
|
||||
Binder binder = getBinder(binderConfiguration);
|
||||
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
@@ -468,9 +406,7 @@ public class KafkaBinderTests
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat((byte[]) inbound.getPayload()).containsExactly(testPayload);
|
||||
Collection<PartitionInfo> partitions =
|
||||
consumerFactory().createConsumer().partitionsFor("foo" + uniqueBindingId + ".0");
|
||||
assertThat(partitions).hasSize(5);
|
||||
assertThat(partitionSize("foo" + uniqueBindingId + ".0")).isEqualTo(5);
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
@@ -478,11 +414,11 @@ public class KafkaBinderTests
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDefaultConsumerStartsAtEarliest() throws Exception {
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(createConfigurationProperties());
|
||||
Binder binder = getBinder(createConfigurationProperties());
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.afterPropertiesSet();
|
||||
//binder.setApplicationContext(context);
|
||||
//binder.afterPropertiesSet();
|
||||
DirectChannel output = new DirectChannel();
|
||||
QueueChannel input1 = new QueueChannel();
|
||||
|
||||
@@ -515,7 +451,7 @@ public class KafkaBinderTests
|
||||
Binding<MessageChannel> consumerBinding = null;
|
||||
|
||||
try {
|
||||
KafkaTestBinder binder = getBinder();
|
||||
Binder binder = getBinder();
|
||||
DirectChannel output = new DirectChannel();
|
||||
QueueChannel input1 = new QueueChannel();
|
||||
|
||||
@@ -552,7 +488,7 @@ public class KafkaBinderTests
|
||||
@Ignore("Needs further discussion")
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testReset() throws Exception {
|
||||
KafkaTestBinder binder = getBinder();
|
||||
Binder binder = getBinder();
|
||||
DirectChannel output = new DirectChannel();
|
||||
QueueChannel input1 = new QueueChannel();
|
||||
|
||||
@@ -605,11 +541,7 @@ public class KafkaBinderTests
|
||||
|
||||
try {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.afterPropertiesSet();
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
DirectChannel output = new DirectChannel();
|
||||
QueueChannel input1 = new QueueChannel();
|
||||
|
||||
@@ -653,251 +585,6 @@ public class KafkaBinderTests
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSyncProducerMetadata() throws Exception {
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(createConfigurationProperties());
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.afterPropertiesSet();
|
||||
DirectChannel output = new DirectChannel();
|
||||
String testTopicName = UUID.randomUUID().toString();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
|
||||
properties.getExtension().setSync(true);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, output, properties);
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(extractEndpoint(producerBinding));
|
||||
KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor.getWrappedInstance();
|
||||
assertThat(new DirectFieldAccessor(wrappedInstance).getPropertyValue("sync").equals(Boolean.TRUE))
|
||||
.withFailMessage("Kafka Sync Producer should have been enabled.");
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoCreateTopicsDisabledFailsIfTopicMissing() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
configurationProperties.setAutoCreateTopics(false);
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.afterPropertiesSet();
|
||||
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
|
||||
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
binder.setMetadataRetryOperations(metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
String testTopicName = "nonexisting" + System.currentTimeMillis();
|
||||
try {
|
||||
binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
fail();
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e).isInstanceOf(BinderException.class);
|
||||
assertThat(e).hasMessageContaining("Topic " + testTopicName + " does not exist");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoConfigureTopicsDisabledSucceedsIfTopicExisting() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
|
||||
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
AdminUtils.createTopic(zkUtils, testTopicName, 5, 1, new Properties());
|
||||
|
||||
configurationProperties.setAutoCreateTopics(false);
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.afterPropertiesSet();
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
Binding<MessageChannel> binding = binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
binding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoAddPartitionsDisabledFailsIfTopicUnderpartitioned() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
|
||||
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
AdminUtils.createTopic(zkUtils, testTopicName, 1, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(false);
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.afterPropertiesSet();
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
// this consumer must consume from partition 2
|
||||
consumerProperties.setInstanceCount(3);
|
||||
consumerProperties.setInstanceIndex(2);
|
||||
try {
|
||||
binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e).isInstanceOf(BinderException.class);
|
||||
assertThat(e)
|
||||
.hasMessageContaining("The number of expected partitions was: 3, but 1 has been found instead");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoAddPartitionsDisabledSucceedsIfTopicPartitionedCorrectly() throws Exception {
|
||||
Binding<?> binding = null;
|
||||
try {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
|
||||
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
AdminUtils.createTopic(zkUtils, testTopicName, 6, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(false);
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
|
||||
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
binder.setMetadataRetryOperations(metatadataRetrievalRetryOperations);
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.afterPropertiesSet();
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
// this consumer must consume from partition 2
|
||||
consumerProperties.setInstanceCount(3);
|
||||
consumerProperties.setInstanceIndex(2);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
|
||||
binding = binder.doBindConsumer(testTopicName, "test-x", output, consumerProperties);
|
||||
|
||||
TopicPartitionInitialOffset[] listenedPartitions = TestUtils.getPropertyValue(binding,
|
||||
"endpoint.messageListenerContainer.containerProperties.topicPartitions",
|
||||
TopicPartitionInitialOffset[].class);
|
||||
assertThat(listenedPartitions).hasSize(2);
|
||||
assertThat(listenedPartitions).contains(new TopicPartitionInitialOffset(testTopicName, 2),
|
||||
new TopicPartitionInitialOffset(testTopicName, 5));
|
||||
Collection<PartitionInfo> partitions =
|
||||
consumerFactory().createConsumer().partitionsFor(testTopicName);
|
||||
assertThat(partitions).hasSize(6);
|
||||
}
|
||||
finally {
|
||||
binding.unbind();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoCreateTopicsEnabledSucceeds() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
configurationProperties.setAutoCreateTopics(true);
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.afterPropertiesSet();
|
||||
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
|
||||
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
binder.setMetadataRetryOperations(metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
String testTopicName = "nonexisting" + System.currentTimeMillis();
|
||||
Binding<?> binding = binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
binding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionCountNotReduced() throws Exception {
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
|
||||
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
|
||||
AdminUtils.createTopic(zkUtils, testTopicName, 6, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.afterPropertiesSet();
|
||||
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
|
||||
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
binder.setMetadataRetryOperations(metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
Binding<?> binding = binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
binding.unbind();
|
||||
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(testTopicName,
|
||||
zkUtils);
|
||||
assertThat(topicMetadata.partitionsMetadata().size()).isEqualTo(6);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionCountIncreasedIfAutoAddPartitionsSet() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
|
||||
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
AdminUtils.createTopic(zkUtils, testTopicName, 1, 1, new Properties());
|
||||
configurationProperties.setMinPartitionCount(6);
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.afterPropertiesSet();
|
||||
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
|
||||
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
binder.setMetadataRetryOperations(metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
Binding<?> binding = binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
binding.unbind();
|
||||
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(testTopicName,
|
||||
zkUtils);
|
||||
assertThat(topicMetadata.partitionsMetadata().size()).isEqualTo(6);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -1156,7 +843,6 @@ public class KafkaBinderTests
|
||||
|
||||
QueueChannel input1 = new QueueChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
//consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
Binding<MessageChannel> binding1 = binder.bindConsumer("defaultGroup.0", null, input1,
|
||||
consumerProperties);
|
||||
|
||||
@@ -1203,7 +889,216 @@ public class KafkaBinderTests
|
||||
binding2.unbind();
|
||||
}
|
||||
|
||||
private static final class FailingInvocationCountingMessageHandler implements MessageHandler {
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSyncProducerMetadata() throws Exception {
|
||||
Binder binder = getBinder(createConfigurationProperties());
|
||||
DirectChannel output = new DirectChannel();
|
||||
String testTopicName = UUID.randomUUID().toString();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
|
||||
properties.getExtension().setSync(true);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, output, properties);
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(extractEndpoint(producerBinding));
|
||||
KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor.getWrappedInstance();
|
||||
assertThat(new DirectFieldAccessor(wrappedInstance).getPropertyValue("sync").equals(Boolean.TRUE))
|
||||
.withFailMessage("Kafka Sync Producer should have been enabled.");
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoCreateTopicsDisabledFailsIfTopicMissing() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
configurationProperties.setAutoCreateTopics(false);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
|
||||
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
String testTopicName = "nonexisting" + System.currentTimeMillis();
|
||||
try {
|
||||
binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
fail();
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e).isInstanceOf(BinderException.class);
|
||||
assertThat(e).hasMessageContaining("Topic " + testTopicName + " does not exist");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoConfigureTopicsDisabledSucceedsIfTopicExisting() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
|
||||
final ZkUtils zkUtils = getZkUtils(configurationProperties);
|
||||
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
invokeCreateTopic(zkUtils, testTopicName, 5, 1, new Properties());
|
||||
|
||||
configurationProperties.setAutoCreateTopics(false);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
Binding<MessageChannel> binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
binding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testPartitionCountIncreasedIfAutoAddPartitionsSet() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
|
||||
final ZkUtils zkUtils = getZkUtils(configurationProperties);
|
||||
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
|
||||
configurationProperties.setMinPartitionCount(6);
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
|
||||
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
Binding<?> binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
binding.unbind();
|
||||
assertThat(invokePartitionSize(testTopicName, zkUtils)).isEqualTo(6);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoAddPartitionsDisabledFailsIfTopicUnderpartitioned() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
|
||||
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
invokeCreateTopic(zkUtils, testTopicName, 1, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(false);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
// binder.setApplicationContext(context);
|
||||
// binder.afterPropertiesSet();
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
// this consumer must consume from partition 2
|
||||
consumerProperties.setInstanceCount(3);
|
||||
consumerProperties.setInstanceIndex(2);
|
||||
Binding binding = null;
|
||||
try {
|
||||
binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e).isInstanceOf(BinderException.class);
|
||||
assertThat(e)
|
||||
.hasMessageContaining("The number of expected partitions was: 3, but 1 has been found instead");
|
||||
}
|
||||
finally {
|
||||
if (binding != null) {
|
||||
binding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoAddPartitionsDisabledSucceedsIfTopicPartitionedCorrectly() throws Exception {
|
||||
Binding<?> binding = null;
|
||||
try {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
|
||||
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(false);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
|
||||
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
// this consumer must consume from partition 2
|
||||
consumerProperties.setInstanceCount(3);
|
||||
consumerProperties.setInstanceIndex(2);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
|
||||
binding = binder.bindConsumer(testTopicName, "test-x", output, consumerProperties);
|
||||
|
||||
TopicPartitionInitialOffset[] listenedPartitions = TestUtils.getPropertyValue(binding,
|
||||
"endpoint.messageListenerContainer.containerProperties.topicPartitions",
|
||||
TopicPartitionInitialOffset[].class);
|
||||
assertThat(listenedPartitions).hasSize(2);
|
||||
assertThat(listenedPartitions).contains(new TopicPartitionInitialOffset(testTopicName, 2),
|
||||
new TopicPartitionInitialOffset(testTopicName, 5));
|
||||
int partitions = invokePartitionSize(testTopicName, zkUtils);
|
||||
assertThat(partitions).isEqualTo(6);
|
||||
}
|
||||
finally {
|
||||
if (binding != null) {
|
||||
binding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testPartitionCountNotReduced() throws Exception {
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
|
||||
final ZkClient zkClient;
|
||||
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
|
||||
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(1000);
|
||||
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
|
||||
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
Binding<?> binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
binding.unbind();
|
||||
|
||||
assertThat(partitionSize(testTopicName)).isEqualTo(6);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
private final class FailingInvocationCountingMessageHandler implements MessageHandler {
|
||||
|
||||
private int invocationCount;
|
||||
|
||||
@@ -1222,7 +1117,7 @@ public class KafkaBinderTests
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
invocationCount++;
|
||||
Long offset = message.getHeaders().get(KafkaHeaders.OFFSET, Long.class);
|
||||
Long offset = message.getHeaders().get(KafkaBinderTests.this.getKafkaOffsetHeaderKey(), Long.class);
|
||||
// using the offset as key allows to ensure that we don't store duplicate
|
||||
// messages on retry
|
||||
if (!receivedMessages.containsKey(offset)) {
|
||||
|
||||
@@ -40,12 +40,12 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
* @author Gary Russell
|
||||
* @author Mark Fisher
|
||||
*/
|
||||
public class RawModeKafkaBinderTests extends KafkaBinderTests {
|
||||
public class RawModeKafka09BinderTests extends Kafka09BinderTests {
|
||||
|
||||
@Test
|
||||
@Override
|
||||
public void testPartitionedModuleJava() throws Exception {
|
||||
KafkaTestBinder binder = getBinder();
|
||||
Kafka09TestBinder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
|
||||
properties.setHeaderMode(HeaderMode.raw);
|
||||
properties.setPartitionKeyExtractorClass(RawKafkaPartitionTestSupport.class);
|
||||
@@ -98,7 +98,7 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
|
||||
@Test
|
||||
@Override
|
||||
public void testPartitionedModuleSpEL() throws Exception {
|
||||
KafkaTestBinder binder = getBinder();
|
||||
Kafka09TestBinder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
|
||||
properties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload[0]"));
|
||||
properties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
|
||||
@@ -160,7 +160,7 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
|
||||
@Test
|
||||
@Override
|
||||
public void testSendAndReceive() throws Exception {
|
||||
KafkaTestBinder binder = getBinder();
|
||||
Kafka09TestBinder binder = getBinder();
|
||||
DirectChannel moduleOutputChannel = new DirectChannel();
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
@@ -184,7 +184,7 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
|
||||
|
||||
@Test
|
||||
public void testSendAndReceiveWithExplicitConsumerGroup() {
|
||||
KafkaTestBinder binder = getBinder();
|
||||
Kafka09TestBinder binder = getBinder();
|
||||
DirectChannel moduleOutputChannel = new DirectChannel();
|
||||
// Test pub/sub by emulating how StreamPlugin handles taps
|
||||
QueueChannel module1InputChannel = new QueueChannel();
|
||||
Reference in New Issue
Block a user