Compare commits

..

13 Commits

Author SHA1 Message Date
Marius Bogoevici
e3687cc5ab Release 1.1.2.RELEASE 2017-03-06 19:38:38 -05:00
Marius Bogoevici
d9c40d823a Use RELEASE version for Stream and Cloud Dependencies 2017-03-06 19:35:14 -05:00
Barry Commins
ca3b89b405 Configure the health indicator ConsumerFactory to use binder properties
Fixes #79

Replaced try...finally in KafkaBinderHealthIndicator with try-with-resources

Changed KafkaBinderHealthIndicatorTest for consistency
2017-03-06 19:20:58 -05:00
Marius Bogoevici
ef220a551a Fix 'spring-cloud-stream-binder-kafka-test-support'
Set scope compile on 'spring-kafka-test' so that the
dependency is transitive.
2017-02-21 19:13:36 -05:00
Ilayaperumal Gopinathan
585448dbad Add doc for startOffset usage
- Clarify based on the consumerGroup property

Resolves #48
2017-02-21 13:13:22 -05:00
Marius Bogoevici
4093a24d3e Reinstate spring-cloud-stream-binder-kafka-test-support
Fix #102

We've removed it in favour of using `spring-kafka-test`
directly but that is somewhat inconvenient to the end
user. Also, it's still part of the release train BOM
so it's an oversight on our end.
2017-02-20 23:22:44 +05:30
Soby Chacko
7d4323cb2e Ignore 'TopicExistsException' on creation
Fixes a race condtion when topics are created in a stream
and a TopicExistsException is thrown.

Fixes #83

Explicitly checking for TopicExistsException
Cleanup
2017-01-30 18:51:42 -05:00
Marius Bogoevici
76a53049c5 Setting version to 1.1.2.BUILD-SNAPSHOT 2017-01-11 22:40:57 -05:00
Marius Bogoevici
b29498617d Release 1.1.1.RELEASE
Signed-off-by: Marius Bogoevici <mbogoevici@pivotal.io>
2017-01-11 22:32:42 -05:00
Marius Bogoevici
547ea77b2f Update mvnw 2017-01-11 22:13:52 -05:00
Marius Bogoevici
699dc06bc5 Observe bufferSize setting
Fixes #77

Signed-off-by: Marius Bogoevici <mbogoevici@pivotal.io>
2017-01-11 22:13:29 -05:00
Marius Bogoevici
e822e59859 Remove package-info.java 2017-01-11 22:07:38 -05:00
Marius Bogoevici
cc630f2be0 Remove explicit version references for Stream and Kafka binder artifacts
Signed-off-by: Marius Bogoevici <mbogoevici@pivotal.io>
2017-01-04 22:33:38 -05:00
44 changed files with 710 additions and 1466 deletions

27
pom.xml
View File

@@ -2,31 +2,28 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.RC1</version>
<version>1.1.2.RELEASE</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.3.1.RELEASE</version>
<version>1.2.2.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.7</java.version>
<kafka.version>0.10.1.1</kafka.version>
<spring-kafka.version>1.1.2.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.1.0.RELEASE</spring-integration-kafka.version>
<spring-cloud-stream.version>1.2.0.RC1</spring-cloud-stream.version>
<kafka.version>0.9.0.1</kafka.version>
<spring-kafka.version>1.0.5.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.0.1.RELEASE</spring-integration-kafka.version>
<spring-cloud-stream.version>1.1.2.RELEASE</spring-cloud-stream.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>
<module>spring-cloud-starter-stream-kafka</module>
<module>spring-cloud-stream-binder-kafka-docs</module>
<module>spring-cloud-stream-binder-kafka-0.9-test</module>
<module>spring-cloud-stream-binder-kafka-0.10.0-test</module>
<module>spring-cloud-stream-binder-kafka-core</module>
<module>spring-cloud-stream-binder-kafka-test-support</module>
</modules>
<module>spring-cloud-stream-binder-kafka-0.10-test</module>
<module>spring-cloud-stream-binder-kafka-test-support</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
@@ -44,10 +41,6 @@
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
@@ -138,7 +131,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build-tools</artifactId>
<version>1.3.1.RELEASE</version>
<version>1.2.2.RELEASE</version>
</dependency>
</dependencies>
<executions>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.RC1</version>
<version>1.1.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>
@@ -20,7 +20,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<version>1.1.2.RELEASE</version>
</dependency>
</dependencies>
</project>

View File

@@ -4,10 +4,10 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.RC1</version>
<version>1.1.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.0-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.0 Tests</description>
<artifactId>spring-cloud-stream-binder-kafka-0.10-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10 Tests</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
@@ -16,20 +16,15 @@
<properties>
<main.basedir>${basedir}/../..</main.basedir>
<!--
Override Kafka dependencies to Kafka 0.9.0.1 and supporting Spring Kafka and
Override Kafka dependencies to Kafka 0.10 and supporting Spring Kafka and
Spring Integration Kafka versions
-->
<kafka.version>0.10.0.1</kafka.version>
<spring-kafka.version>1.1.2.RELEASE</spring-kafka.version>
<kafka.version>0.10.0.0</kafka.version>
<spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.1.0.RELEASE</spring-integration-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>1.2.0.RC1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
@@ -105,4 +100,5 @@
</repository>
</repositories>
</project>

View File

@@ -31,7 +31,6 @@ import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.assertj.core.api.Assertions;
import org.eclipse.jetty.server.Server;
import org.junit.Before;
import org.junit.ClassRule;
@@ -43,9 +42,7 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.core.ConsumerFactory;
@@ -57,6 +54,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RetryOperations;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@@ -113,6 +111,12 @@ public class Kafka10BinderTests extends KafkaBinderTests {
return consumerFactory().createConsumer().partitionsFor(topic).size();
}
@Override
@SuppressWarnings("unchecked")
protected void setMetadataRetryOperations(Binder binder, RetryOperations retryOperations) {
((Kafka10TestBinder) binder).getBinder().setMetadataRetryOperations(retryOperations);
}
@Override
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
@@ -233,8 +237,8 @@ public class Kafka10BinderTests extends KafkaBinderTests {
assertThat(inbound).isNotNull();
assertTrue(message.getPayload() instanceof User1);
User1 receivedUser = (User1) message.getPayload();
Assertions.assertThat(receivedUser.getName()).isEqualTo(userName1);
Assertions.assertThat(receivedUser.getFavoriteColor()).isEqualTo(favColor1);
assertThat(receivedUser.getName()).isEqualTo(userName1);
assertThat(receivedUser.getFavoriteColor()).isEqualTo(favColor1);
producerBinding.unbind();
consumerBinding.unbind();
}

View File

@@ -16,10 +16,8 @@
package org.springframework.cloud.stream.binder.kafka;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
@@ -36,19 +34,14 @@ public class Kafka10TestBinder extends AbstractKafkaTestBinder {
public Kafka10TestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
try {
AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
KafkaTopicProvisioner provisioningProvider =
new KafkaTopicProvisioner(binderConfiguration, adminUtilsOperation);
provisioningProvider.afterPropertiesSet();
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration, provisioningProvider);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration);
binder.setCodec(getCodec());
ProducerListener producerListener = new LoggingProducerListener();
binder.setProducerListener(producerListener);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.setAdminUtilsOperation(new Kafka10AdminUtilsOperation());
binder.afterPropertiesSet();
this.setBinder(binder);
}

View File

@@ -1,28 +0,0 @@
/*
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
* This test specifically tests for the 0.10.0.1 version of Kafka.
*
* @author Soby Chacko
*/
public class Kafka_0_10_0_BinderTests extends Kafka10BinderTests {
}

View File

@@ -1,78 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.RC1</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.9</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.9 Tests</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>http://www.spring.io</url>
</organization>
<properties>
<main.basedir>${basedir}/../..</main.basedir>
<!--
Override Kafka dependencies to Kafka 0.9.0.1 and supporting Spring Kafka and
Spring Integration Kafka versions
-->
<kafka.version>0.9.0.1</kafka.version>
<spring-kafka.version>1.0.5.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.0.1.RELEASE</spring-integration-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -1,46 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.RC1</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>http://www.spring.io</url>
</organization>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>${spring-integration-kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@@ -1,314 +0,0 @@
/*
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.provisioning;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.security.JaasUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import kafka.common.ErrorMapping;
import kafka.utils.ZkUtils;
/**
* Kafka implementation for {@link ProvisioningProvider}
*
* @author Soby Chacko
* @author Gary Russell
*/
public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>>, InitializingBean {
private final Log logger = LogFactory.getLog(getClass());
private final KafkaBinderConfigurationProperties configurationProperties;
private final AdminUtilsOperation adminUtilsOperation;
private RetryOperations metadataRetryOperations;
public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
AdminUtilsOperation adminUtilsOperation) {
this.configurationProperties = kafkaBinderConfigurationProperties;
this.adminUtilsOperation = adminUtilsOperation;
}
/**
*
* @param metadataRetryOperations the retry configuration
*/
public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
this.metadataRetryOperations = metadataRetryOperations;
}
@Override
public void afterPropertiesSet() throws Exception {
if (this.metadataRetryOperations == null) {
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(10);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(100);
backOffPolicy.setMultiplier(2);
backOffPolicy.setMaxInterval(1000);
retryTemplate.setBackOffPolicy(backOffPolicy);
this.metadataRetryOperations = retryTemplate;
}
}
@Override
public ProducerDestination provisionProducerDestination(final String name, ExtendedProducerProperties<KafkaProducerProperties> properties) {
if (this.logger.isInfoEnabled()) {
this.logger.info("Using kafka topic for outbound: " + name);
}
KafkaTopicUtils.validateTopicName(name);
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, properties.getPartitionCount());
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
this.configurationProperties.getZkSessionTimeout(),
this.configurationProperties.getZkConnectionTimeout(),
JaasUtils.isZkSecurityEnabled());
int partitions = adminUtilsOperation.partitionSize(name, zkUtils);
return new KafkaProducerDestination(name, partitions);
}
else {
return new KafkaProducerDestination(name);
}
}
@Override
public ConsumerDestination provisionConsumerDestination(final String name, final String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
KafkaTopicUtils.validateTopicName(name);
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
if (properties.getInstanceCount() == 0) {
throw new IllegalArgumentException("Instance count cannot be zero");
}
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, partitionCount);
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
this.configurationProperties.getZkSessionTimeout(),
this.configurationProperties.getZkConnectionTimeout(),
JaasUtils.isZkSecurityEnabled());
int partitions = adminUtilsOperation.partitionSize(name, zkUtils);
if (properties.getExtension().isEnableDlq() && !anonymous) {
String dlqTopic = "error." + name + "." + group;
createTopicAndPartitions(dlqTopic, partitions);
return new KafkaConsumerDestination(name, partitions, dlqTopic);
}
return new KafkaConsumerDestination(name, partitions);
}
return new KafkaConsumerDestination(name);
}
private void createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(final String topicName, final int partitionCount) {
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
createTopicAndPartitions(topicName, partitionCount);
}
else if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation == null) {
this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. " +
"No topic will be created by the binder");
}
else if (!this.configurationProperties.isAutoCreateTopics()) {
this.logger.info("Auto creation of topics is disabled.");
}
}
/**
* Creates a Kafka topic if needed, or try to increase its partition count to the
* desired number.
*/
private void createTopicAndPartitions(final String topicName, final int partitionCount) {
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
this.configurationProperties.getZkSessionTimeout(),
this.configurationProperties.getZkConnectionTimeout(),
JaasUtils.isZkSecurityEnabled());
try {
short errorCode = adminUtilsOperation.errorCodeFromTopicMetadata(topicName, zkUtils);
if (errorCode == ErrorMapping.NoError()) {
// only consider minPartitionCount for resizing if autoAddPartitions is true
int effectivePartitionCount = this.configurationProperties.isAutoAddPartitions()
? Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount)
: partitionCount;
int partitionSize = adminUtilsOperation.partitionSize(topicName, zkUtils);
if (partitionSize < effectivePartitionCount) {
if (this.configurationProperties.isAutoAddPartitions()) {
adminUtilsOperation.invokeAddPartitions(zkUtils, topicName, effectivePartitionCount, null, false);
}
else {
throw new ProvisioningException("The number of expected partitions was: " + partitionCount + ", but "
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
+ "Consider either increasing the partition count of the topic or enabling " +
"`autoAddPartitions`");
}
}
}
else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
// always consider minPartitionCount for topic creation
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
partitionCount);
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
@Override
public Object doWithRetry(RetryContext context) throws RuntimeException {
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
configurationProperties.getReplicationFactor(), new Properties());
return null;
}
});
}
else {
throw new ProvisioningException("Error fetching Kafka topic metadata: ",
ErrorMapping.exceptionFor(errorCode));
}
}
finally {
zkUtils.close();
}
}
public Collection<PartitionInfo> getPartitionsForTopic(final int partitionCount, final Callable<Collection<PartitionInfo>> callable) {
try {
return this.metadataRetryOperations
.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() {
@Override
public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
Collection<PartitionInfo> partitions = callable.call();
// do a sanity check on the partition set
if (partitions.size() < partitionCount) {
throw new IllegalStateException("The number of expected partitions was: "
+ partitionCount + ", but " + partitions.size()
+ (partitions.size() > 1 ? " have " : " has ") + "been found instead");
}
return partitions;
}
});
}
catch (Exception e) {
this.logger.error("Cannot initialize Binder", e);
throw new BinderException("Cannot initialize binder:", e);
}
}
private static final class KafkaProducerDestination implements ProducerDestination {
private final String producerDestinationName;
private final int partitions;
KafkaProducerDestination(String destinationName) {
this(destinationName, 0);
}
KafkaProducerDestination(String destinationName, Integer partitions) {
this.producerDestinationName = destinationName;
this.partitions = partitions;
}
@Override
public String getName() {
return producerDestinationName;
}
@Override
public String getNameForPartition(int partition) {
return producerDestinationName;
}
@Override
public String toString() {
return "KafkaProducerDestination{" +
"producerDestinationName='" + producerDestinationName + '\'' +
", partitions=" + partitions +
'}';
}
}
private static final class KafkaConsumerDestination implements ConsumerDestination {
private final String consumerDestinationName;
private final int partitions;
private final String dlqName;
KafkaConsumerDestination(String consumerDestinationName) {
this(consumerDestinationName, 0, null);
}
KafkaConsumerDestination(String consumerDestinationName, int partitions) {
this(consumerDestinationName, partitions, null);
}
KafkaConsumerDestination(String consumerDestinationName, Integer partitions, String dlqName) {
this.consumerDestinationName = consumerDestinationName;
this.partitions = partitions;
this.dlqName = dlqName;
}
@Override
public String getName() {
return this.consumerDestinationName;
}
@Override
public String toString() {
return "KafkaConsumerDestination{" +
"consumerDestinationName='" + consumerDestinationName + '\'' +
", partitions=" + partitions +
", dlqName='" + dlqName + '\'' +
'}';
}
}
}

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.RC1</version>
<version>1.1.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>

View File

@@ -1,109 +0,0 @@
[[kafka-dlq-processing]]
== Dead-Letter Topic Processing
Because it can't be anticipated how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them.
If the reason for the dead-lettering is transient, you may wish to route the messages back to the original topic.
However, if the problem is a permanent issue, that could cause an infinite loop.
The following `spring-boot` application is an example of how to route those messages back to the original topic, but moves them to a third "parking lot" topic after three attempts.
The application is simply another spring-cloud-stream application that reads from the dead-letter topic.
It terminates when no messages are received for 5 seconds.
The examples assume the original destination is `so8400out` and the consumer group is `so8400`.
There are several considerations.
- Consider only running the rerouting when the main application is not running.
Otherwise, the retries for transient errors will be used up very quickly.
- Alternatively, use a two-stage approach - use this application to route to a third topic, and another to route from there back to the main topic.
- Since this technique uses a message header to keep track of retries, it won't work with `headerMode=raw`.
In that case, consider adding some data to the payload (that can be ignored by the main application).
- `x-retries` has to be added to the `headers` property `spring.cloud.stream.kafka.binder.headers=x-retries` on both this, and the main application so that the header is transported between the applications.
- Since kafka is publish/subscribe, replayed messages will be sent to each consumer group, even those that successfully processed a message the first time around.
.application.properties
[source]
----
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
----
.Application
[source, java]
----
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private MessageChannel parkingLot;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> reRoute(Message<?> failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries.intValue() < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, terminating");
return;
}
}
}
public interface TwoOutputProcessor extends Processor {
@Output("parkingLot")
MessageChannel parkingLot();
}
}
----

View File

@@ -23,7 +23,7 @@ Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinat
= Reference Guide
include::overview.adoc[]
include::dlq.adoc[]
= Appendices
[appendix]

View File

@@ -329,50 +329,26 @@ In secure environments, we strongly recommend creating topics and managing ACLs
==== Using the binder with Apache Kafka 0.10
The default Kafka support in Spring Cloud Stream Kafka binder is for Kafka version 0.10.1.1. The binder also supports connecting to other 0.10 based versions and 0.9 clients.
In order to do this, when you create the project that contains your application, include `spring-cloud-starter-stream-kafka` as you normally would do for the default binder.
Then add these dependencies at the top of the `<dependencies>` section in the pom.xml file to override the dependencies.
Here is an example for downgrading your application to 0.10.0.1. Since it is still on the 0.10 line, the default `spring-kafka` and `spring-integration-kafka` versions can be retained.
[source,xml]
----
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
----
Here is another example of using 0.9.0.1 version.
The binder also supports connecting to Kafka 0.10 brokers.
In order to support this, when you create the project that contains your application, include `spring-cloud-starter-stream-kafka` as you normally would do for 0.9 based applications.
Then add these dependencies at the top of the `<dependencies>` section in the pom.xml file to override the Apache Kafka, Spring Kafka, and Spring Integration Kafka with 0.10-compatible versions as in the following example:
[source,xml]
----
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.5.RELEASE</version>
<version>1.1.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.1.RELEASE</version>
<version>2.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
<version>0.10.0.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -380,12 +356,6 @@ Here is another example of using 0.9.0.1 version.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
----
[NOTE]
@@ -400,8 +370,8 @@ For best results, we recommend using the most recent 0.10-compatible versions of
The Apache Kafka Binder uses the administrative utilities which are part of the Apache Kafka server library to create and reconfigure topics.
If the inclusion of the Apache Kafka server library and its dependencies is not necessary at runtime because the application will rely on the topics being configured administratively, the Kafka binder allows for Apache Kafka server dependency to be excluded from the application.
If you use non default versions for Kafka dependencies as advised above, all you have to do is not to include the kafka broker dependency.
If you use the default Kafka version, then ensure that you exclude the kafka broker jar from the `spring-cloud-starter-stream-kafka` dependency as following.
If you use Kafka 10 dependencies as advised above, all you have to do is not to include the kafka broker dependency.
If you use Kafka 0.9, then ensure that you exclude the kafka broker jar from the `spring-cloud-starter-stream-kafka` dependency as following.
[source,xml]
----

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.RC1</version>
<version>1.1.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<description>Kafka related test classes</description>

View File

@@ -10,15 +10,10 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.RC1</version>
<version>1.1.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>1.2.0.RC1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
@@ -42,6 +37,17 @@
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>${spring-integration-kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
@@ -76,38 +82,8 @@
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>${spring-cloud-stream.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>

View File

@@ -1,69 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
/**
* An {@link EnvironmentPostProcessor} that sets some common configuration properties (log config etc.,) for Kafka
* binder.
*
* @author Ilayaperumal Gopinathan
*/
public class KafkaBinderEnvironmentPostProcessor implements EnvironmentPostProcessor {
public final static String SPRING_KAFKA = "spring.kafka";
public final static String SPRING_KAFKA_PRODUCER = SPRING_KAFKA + ".producer";
public final static String SPRING_KAFKA_CONSUMER = SPRING_KAFKA + ".consumer";
public final static String SPRING_KAFKA_PRODUCER_KEY_SERIALIZER = SPRING_KAFKA_PRODUCER + "." + "keySerializer";
public final static String SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER = SPRING_KAFKA_PRODUCER + "." + "valueSerializer";
public final static String SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER = SPRING_KAFKA_CONSUMER + "." + "keyDeserializer";
public final static String SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER = SPRING_KAFKA_CONSUMER + "." + "valueDeserializer";
private static final String KAFKA_BINDER_DEFAULT_PROPERTIES = "kafkaBinderDefaultProperties";
@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
if (!environment.getPropertySources().contains(KAFKA_BINDER_DEFAULT_PROPERTIES)) {
Map<String, Object> kafkaBinderDefaultProperties = new HashMap<>();
kafkaBinderDefaultProperties.put("logging.pattern.console", "%d{ISO8601} %5p %t %c{2}:%L - %m%n");
kafkaBinderDefaultProperties.put("logging.level.org.I0Itec.zkclient", "ERROR");
kafkaBinderDefaultProperties.put("logging.level.kafka.server.KafkaConfig", "ERROR");
kafkaBinderDefaultProperties.put("logging.level.kafka.admin.AdminClient.AdminConfig", "ERROR");
kafkaBinderDefaultProperties.put(SPRING_KAFKA_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName());
kafkaBinderDefaultProperties.put(SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER, ByteArraySerializer.class.getName());
kafkaBinderDefaultProperties.put(SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER, ByteArrayDeserializer.class.getName());
kafkaBinderDefaultProperties.put(SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName());
environment.getPropertySources().addLast(new MapPropertySource(KAFKA_BINDER_DEFAULT_PROPERTIES, kafkaBinderDefaultProperties));
}
}
}

View File

@@ -28,7 +28,7 @@ import org.apache.kafka.common.security.JaasUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.properties;
package org.springframework.cloud.stream.binder.kafka;
/**
* @author Marius Bogoevici

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.properties;
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashMap;
import java.util.Map;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.properties;
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashMap;
import java.util.Map;

View File

@@ -22,30 +22,34 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import kafka.common.ErrorMapping;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.context.Lifecycle;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
@@ -61,17 +65,19 @@ import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* A {@link Binder} that uses Kafka as the underlying middleware.
@@ -86,20 +92,26 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
*/
public class KafkaMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
ExtendedProducerProperties<KafkaProducerProperties>, Collection<PartitionInfo>, String>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties>,
DisposableBean {
private final KafkaBinderConfigurationProperties configurationProperties;
private ProducerListener<byte[], byte[]> producerListener;
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
private RetryOperations metadataRetryOperations;
private final Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<>();
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
super(false, headersToMap(configurationProperties), provisioningProvider);
private ProducerListener<byte[], byte[]> producerListener;
private volatile Producer<byte[], byte[]> dlqProducer;
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
private AdminUtilsOperation adminUtilsOperation;
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties) {
super(false, headersToMap(configurationProperties));
this.configurationProperties = configurationProperties;
}
@@ -119,10 +131,50 @@ public class KafkaMessageChannelBinder extends
return headersToMap;
}
public void setAdminUtilsOperation(AdminUtilsOperation adminUtilsOperation) {
this.adminUtilsOperation = adminUtilsOperation;
}
/**
* Retry configuration for operations such as validating topic creation
*
* @param metadataRetryOperations the retry configuration
*/
public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
this.metadataRetryOperations = metadataRetryOperations;
}
public void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) {
this.extendedBindingProperties = extendedBindingProperties;
}
@Override
public void onInit() throws Exception {
if (this.metadataRetryOperations == null) {
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(10);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(100);
backOffPolicy.setMultiplier(2);
backOffPolicy.setMaxInterval(1000);
retryTemplate.setBackOffPolicy(backOffPolicy);
this.metadataRetryOperations = retryTemplate;
}
}
@Override
public void destroy() throws Exception {
if (this.dlqProducer != null) {
this.dlqProducer.close();
this.dlqProducer = null;
}
}
public void setProducerListener(ProducerListener<byte[], byte[]> producerListener) {
this.producerListener = producerListener;
}
@@ -142,56 +194,68 @@ public class KafkaMessageChannelBinder extends
}
@Override
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
final DefaultKafkaProducerFactory<byte[], byte[]> producerFB = getProducerFactory(producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(producerProperties.getPartitionCount(),
new Callable<Collection<PartitionInfo>>() {
@Override
public Collection<PartitionInfo> call() throws Exception {
return producerFB.createProducer().partitionsFor(destination.getName());
}
});
this.topicsInUse.put(destination.getName(), partitions);
protected MessageHandler createProducerMessageHandler(final String destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
KafkaTopicUtils.validateTopicName(destination);
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(destination, producerProperties.getPartitionCount());
Collection<PartitionInfo> partitions = getPartitionsForTopic(destination, producerProperties.getPartitionCount());
if (producerProperties.getPartitionCount() < partitions.size()) {
if (this.logger.isInfoEnabled()) {
this.logger.info("The `partitionCount` of the producer for topic " + destination.getName() + " is "
this.logger.info("The `partitionCount` of the producer for topic " + destination + " is "
+ producerProperties.getPartitionCount() + ", smaller than the actual partition count of "
+ partitions.size() + " of the topic. The larger number will be used instead.");
}
}
this.topicsInUse.put(destination, partitions);
DefaultKafkaProducerFactory<byte[], byte[]> producerFB = getProducerFactory(producerProperties);
KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFB);
if (this.producerListener != null) {
kafkaTemplate.setProducerListener(this.producerListener);
}
return new ProducerConfigurationMessageHandler(kafkaTemplate, destination.getName(), producerProperties, producerFB);
return new ProducerConfigurationMessageHandler(kafkaTemplate, destination, producerProperties, producerFB);
}
@Override
protected String createProducerDestinationIfNecessary(String name,
ExtendedProducerProperties<KafkaProducerProperties> properties) {
if (this.logger.isInfoEnabled()) {
this.logger.info("Using kafka topic for outbound: " + name);
}
KafkaTopicUtils.validateTopicName(name);
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, properties.getPartitionCount());
Collection<PartitionInfo> partitions = getPartitionsForTopic(name, properties.getPartitionCount());
if (properties.getPartitionCount() < partitions.size()) {
if (this.logger.isInfoEnabled()) {
this.logger.info("The `partitionCount` of the producer for topic " + name + " is "
+ properties.getPartitionCount() + ", smaller than the actual partition count of "
+ partitions.size() + " of the topic. The larger number will be used instead.");
}
}
this.topicsInUse.put(name, partitions);
return name;
}
private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
Map<String, Object> props = new HashMap<>();
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(this.configurationProperties.getRequiredAcks()));
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
if (ObjectUtils.isEmpty(props.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
}
if (ObjectUtils.isEmpty(props.get(ProducerConfig.BATCH_SIZE_CONFIG))) {
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
}
if (ObjectUtils.isEmpty(props.get(ProducerConfig.LINGER_MS_CONFIG))) {
props.put(ProducerConfig.LINGER_MS_CONFIG, String.valueOf(producerProperties.getExtension().getBatchTimeout()));
}
if (ObjectUtils.isEmpty(props.get(ProducerConfig.COMPRESSION_TYPE_CONFIG))) {
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
producerProperties.getExtension().getCompressionType().toString());
}
props.put(ProducerConfig.LINGER_MS_CONFIG,
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
producerProperties.getExtension().getCompressionType().toString());
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
}
@@ -199,49 +263,56 @@ public class KafkaMessageChannelBinder extends
}
@Override
@SuppressWarnings("unchecked")
protected MessageProducer createConsumerEndpoint(final ConsumerDestination destination, final String group,
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !extendedConsumerProperties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup, extendedConsumerProperties);
int partitionCount = extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency();
Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
new Callable<Collection<PartitionInfo>>() {
@Override
public Collection<PartitionInfo> call() throws Exception {
return consumerFactory.createConsumer().partitionsFor(destination.getName());
}
});
protected Collection<PartitionInfo> createConsumerDestinationIfNecessary(String name, String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
KafkaTopicUtils.validateTopicName(name);
if (properties.getInstanceCount() == 0) {
throw new IllegalArgumentException("Instance count cannot be zero");
}
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, partitionCount);
Collection<PartitionInfo> allPartitions = getPartitionsForTopic(name, partitionCount);
Collection<PartitionInfo> listenedPartitions;
if (extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ||
extendedConsumerProperties.getInstanceCount() == 1) {
if (properties.getExtension().isAutoRebalanceEnabled() ||
properties.getInstanceCount() == 1) {
listenedPartitions = allPartitions;
}
else {
listenedPartitions = new ArrayList<>();
for (PartitionInfo partition : allPartitions) {
// divide partitions across modules
if ((partition.partition() % extendedConsumerProperties.getInstanceCount()) == extendedConsumerProperties.getInstanceIndex()) {
if ((partition.partition() % properties.getInstanceCount()) == properties.getInstanceIndex()) {
listenedPartitions.add(partition);
}
}
}
this.topicsInUse.put(destination.getName(), listenedPartitions);
this.topicsInUse.put(name, listenedPartitions);
return listenedPartitions;
}
@Override
@SuppressWarnings("unchecked")
protected MessageProducer createConsumerEndpoint(String name, String group, Collection<PartitionInfo> destination,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
Map<String, Object> props = getConsumerConfig(anonymous, consumerGroup);
if (!ObjectUtils.isEmpty(properties.getExtension().getConfiguration())) {
props.putAll(properties.getExtension().getConfiguration());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
Collection<PartitionInfo> listenedPartitions = destination;
Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
listenedPartitions);
final ContainerProperties containerProperties =
anonymous || extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(destination.getName())
anonymous || properties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(name)
: new ContainerProperties(topicPartitionInitialOffsets);
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
new ConcurrentMessageListenerContainer(
consumerFactory, containerProperties) {
@@ -252,8 +323,8 @@ public class KafkaMessageChannelBinder extends
}
};
messageListenerContainer.setConcurrency(concurrency);
messageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(properties));
if (!properties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
}
if (this.logger.isDebugEnabled()) {
@@ -268,11 +339,11 @@ public class KafkaMessageChannelBinder extends
new KafkaMessageDrivenChannelAdapter<>(
messageListenerContainer);
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
final RetryTemplate retryTemplate = buildRetryTemplate(extendedConsumerProperties);
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(new ExtendedProducerProperties<>(new KafkaProducerProperties()));
final KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
if (properties.getExtension().isEnableDlq()) {
final String dlqTopic = "error." + name + "." + group;
initDlqProducer();
messageListenerContainer.getContainerProperties().setErrorHandler(new ErrorHandler() {
@Override
@@ -281,55 +352,49 @@ public class KafkaMessageChannelBinder extends
: null;
final byte[] payload = message.value() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) message.value())) : null;
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send("error." + destination.getName() + "." + group,
message.partition(), key, payload);
sentDlq.addCallback(new ListenableFutureCallback<SendResult<byte[], byte[]>>() {
StringBuilder sb = new StringBuilder().append(" a message with key='")
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'")
.append(" and payload='")
.append(toDisplayString(ObjectUtils.nullSafeToString(payload), 50))
.append("'").append(" received from ")
.append(message.partition());
KafkaMessageChannelBinder.this.dlqProducer.send(new ProducerRecord<>(dlqTopic, key, payload),
new Callback() {
@Override
public void onFailure(Throwable ex) {
KafkaMessageChannelBinder.this.logger.error(
"Error sending to DLQ" + sb.toString(), ex);
}
@Override
public void onSuccess(SendResult<byte[], byte[]> result) {
if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
KafkaMessageChannelBinder.this.logger.debug(
"Sent to DLQ " + sb.toString());
}
}
});
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
StringBuffer messageLog = new StringBuffer();
messageLog.append(" a message with key='"
+ toDisplayString(ObjectUtils.nullSafeToString(key), 50) + "'");
messageLog.append(" and payload='"
+ toDisplayString(ObjectUtils.nullSafeToString(payload), 50) + "'");
messageLog.append(" received from " + message.partition());
if (exception != null) {
KafkaMessageChannelBinder.this.logger.error(
"Error sending to DLQ" + messageLog.toString(), exception);
}
else {
if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
KafkaMessageChannelBinder.this.logger.debug(
"Sent to DLQ " + messageLog.toString());
}
}
}
});
}
});
}
return kafkaMessageDrivenChannelAdapter;
}
private ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
private Map<String, Object> getConsumerConfig(boolean anonymous, String consumerGroup) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
if (ObjectUtils.isEmpty(props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
}
if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getConfiguration())) {
props.putAll(consumerProperties.getExtension().getConfiguration());
}
return new DefaultKafkaConsumerFactory<>(props);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
anonymous ? "latest" : "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
return props;
}
private boolean isAutoCommitOnError(ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
@@ -351,6 +416,146 @@ public class KafkaMessageChannelBinder extends
return topicPartitionInitialOffsets;
}
private void createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(final String topicName, final int partitionCount) {
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
createTopicAndPartitions(topicName, partitionCount);
}
else if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation == null) {
this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. " +
"No topic will be created by the binder");
}
else if (!this.configurationProperties.isAutoCreateTopics()) {
this.logger.info("Auto creation of topics is disabled.");
}
}
/**
* Creates a Kafka topic if needed, or try to increase its partition count to the
* desired number.
*/
private void createTopicAndPartitions(final String topicName, final int partitionCount) {
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
this.configurationProperties.getZkSessionTimeout(),
this.configurationProperties.getZkConnectionTimeout(),
JaasUtils.isZkSecurityEnabled());
try {
short errorCode = adminUtilsOperation.errorCodeFromTopicMetadata(topicName, zkUtils);
if (errorCode == ErrorMapping.NoError()) {
// only consider minPartitionCount for resizing if autoAddPartitions is true
int effectivePartitionCount = this.configurationProperties.isAutoAddPartitions()
? Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount)
: partitionCount;
int partitionSize = adminUtilsOperation.partitionSize(topicName, zkUtils);
if (partitionSize < effectivePartitionCount) {
if (this.configurationProperties.isAutoAddPartitions()) {
adminUtilsOperation.invokeAddPartitions(zkUtils, topicName, effectivePartitionCount, null, false);
}
else {
throw new BinderException("The number of expected partitions was: " + partitionCount + ", but "
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
+ "Consider either increasing the partition count of the topic or enabling " +
"`autoAddPartitions`");
}
}
}
else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
// always consider minPartitionCount for topic creation
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
partitionCount);
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
@Override
public Object doWithRetry(RetryContext context) throws RuntimeException {
try {
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
configurationProperties.getReplicationFactor(), new Properties());
}
catch (Exception e) {
String exceptionClass = e.getClass().getName();
if (exceptionClass.equals("kafka.common.TopicExistsException") ||
exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")){
if (logger.isWarnEnabled()) {
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
}
}
else {
throw e;
}
}
return null;
}
});
}
else {
throw new BinderException("Error fetching Kafka topic metadata: ",
ErrorMapping.exceptionFor(errorCode));
}
}
finally {
zkUtils.close();
}
}
private Collection<PartitionInfo> getPartitionsForTopic(final String topicName, final int partitionCount) {
try {
return this.metadataRetryOperations
.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() {
@Override
public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
Collection<PartitionInfo> partitions =
getProducerFactory(
new ExtendedProducerProperties<>(new KafkaProducerProperties()))
.createProducer().partitionsFor(topicName);
// do a sanity check on the partition set
if (partitions.size() < partitionCount) {
throw new IllegalStateException("The number of expected partitions was: "
+ partitionCount + ", but " + partitions.size()
+ (partitions.size() > 1 ? " have " : " has ") + "been found instead");
}
return partitions;
}
});
}
catch (Exception e) {
this.logger.error("Cannot initialize Binder", e);
throw new BinderException("Cannot initialize binder:", e);
}
}
private synchronized void initDlqProducer() {
try {
if (this.dlqProducer == null) {
synchronized (this) {
if (this.dlqProducer == null) {
// we can use the producer defaults as we do not need to tune
// performance
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.configurationProperties.getKafkaConnectionString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory<>(props);
this.dlqProducer = defaultKafkaProducerFactory.createProducer();
}
}
}
}
catch (Exception e) {
throw new RuntimeException("Cannot initialize DLQ producer:", e);
}
}
private String toDisplayString(String original, int maxCharacters) {
if (original.length() <= maxCharacters) {
return original;
@@ -366,14 +571,14 @@ public class KafkaMessageChannelBinder extends
private final DefaultKafkaProducerFactory<byte[], byte[]> producerFactory;
private ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
super(kafkaTemplate);
setTopicExpression(new LiteralExpression(topic));
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
if (producerProperties.isPartitioned()) {
SpelExpressionParser parser = new SpelExpressionParser();
setPartitionIdExpression(parser.parseExpression("headers." + BinderHeaders.PARTITION_HEADER));
setPartitionIdExpression(parser.parseExpression("headers.partition"));
}
if (producerProperties.getExtension().isSync()) {
setSync(true);

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.properties;
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashMap;
import java.util.Map;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.utils;
package org.springframework.cloud.stream.binder.kafka;
import java.io.UnsupportedEncodingException;

View File

@@ -24,7 +24,7 @@ import kafka.utils.ZkUtils;
* API around {@link kafka.admin.AdminUtils} to support
* various versions of Kafka brokers.
*
* Note: Implementations that support Kafka brokers other than 0.10, need to use
* Note: Implementations that support Kafka brokers other than 0.9, need to use
* a possible strategy that involves reflection around {@link kafka.admin.AdminUtils}.
*
* @author Soby Chacko

View File

@@ -19,36 +19,33 @@ package org.springframework.cloud.stream.binder.kafka.admin;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.api.TopicMetadata;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.requests.MetadataResponse;
/**
* @author Soby Chacko
*/
public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
public void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions,
String replicaAssignmentStr, boolean checkBrokerAvailable) {
AdminUtils.addPartitions(zkUtils, topic, numPartitions, replicaAssignmentStr, checkBrokerAvailable, null);
AdminUtils.addPartitions(zkUtils, topic, numPartitions,
replicaAssignmentStr, checkBrokerAvailable);
}
public short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils) {
MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
return topicMetadata.error().code();
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
return topicMetadata.errorCode();
}
@SuppressWarnings("unchecked")
public int partitionSize(String topic, ZkUtils zkUtils) {
MetadataResponse.TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
return topicMetadata.partitionMetadata().size();
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils);
return topicMetadata.partitionsMetadata().size();
}
public void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
int replicationFactor, Properties topicConfig) {
int replicationFactor, Properties topicConfig) {
AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor,
topicConfig, null);
topicConfig);
}
}

View File

@@ -18,8 +18,10 @@ package org.springframework.cloud.stream.binder.kafka.admin;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Properties;
import kafka.api.PartitionMetadata;
import kafka.utils.ZkUtils;
import org.springframework.util.ClassUtils;
@@ -28,7 +30,7 @@ import org.springframework.util.ReflectionUtils;
/**
* @author Soby Chacko
*/
public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
private static Class<?> ADMIN_UTIL_CLASS;
@@ -51,9 +53,10 @@ public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
addPartitions = m;
}
}
if (addPartitions != null) {
addPartitions.invoke(null, zkUtils, topic, numPartitions,
replicaAssignmentStr, checkBrokerAvailable);
replicaAssignmentStr, checkBrokerAvailable, null);
}
else {
throw new InvocationTargetException(
@@ -66,16 +69,20 @@ public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
catch (IllegalAccessException e) {
ReflectionUtils.handleReflectionException(e);
}
}
public short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils) {
try {
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
Class<?> topicMetadataClass = ClassUtils.forName("kafka.api.TopicMetadata", null);
Method errorCodeMethod = ReflectionUtils.findMethod(topicMetadataClass, "errorCode");
return (short) errorCodeMethod.invoke(result);
Class<?> topicMetadataClass = ClassUtils.forName("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata", null);
Method errorCodeMethod = ReflectionUtils.findMethod(topicMetadataClass, "error");
Object obj = errorCodeMethod.invoke(result);
Method code = ReflectionUtils.findMethod(obj.getClass(), "code");
return (short) code.invoke(obj);
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("AdminUtils class not found", e);
@@ -87,6 +94,7 @@ public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
ReflectionUtils.handleReflectionException(e);
}
return 0;
}
@SuppressWarnings("unchecked")
@@ -94,13 +102,11 @@ public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
try {
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
Class<?> topicMetadataClass = ClassUtils.forName("kafka.api.TopicMetadata", null);
Class<?> topicMetadataClass = ClassUtils.forName("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata", null);
Method partitionsMetadata = ReflectionUtils.findMethod(topicMetadataClass, "partitionsMetadata");
scala.collection.Seq<kafka.api.PartitionMetadata> partitionSize =
(scala.collection.Seq<kafka.api.PartitionMetadata>)partitionsMetadata.invoke(result);
return partitionSize.size();
Method partitionsMetadata = ReflectionUtils.findMethod(topicMetadataClass, "partitionMetadata");
List<PartitionMetadata> foo = (List<PartitionMetadata>) partitionsMetadata.invoke(result);
return foo.size();
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("AdminUtils class not found", e);
@@ -112,23 +118,22 @@ public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
ReflectionUtils.handleReflectionException(e);
}
return 0;
}
public void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
int replicationFactor, Properties topicConfig) {
int replicationFactor, Properties topicConfig) {
try {
Method[] declaredMethods = ADMIN_UTIL_CLASS.getDeclaredMethods();
Method createTopic = null;
for (Method m : declaredMethods) {
if (m.getName().equals("createTopic")) {
if (m.getName().equals("createTopic") && m.getParameterTypes()[m.getParameterTypes().length - 1].getName().endsWith("RackAwareMode")) {
createTopic = m;
break;
}
}
if (createTopic != null) {
createTopic.invoke(null, zkUtils, topic, partitions,
replicationFactor, topicConfig);
replicationFactor, topicConfig, null);
}
else {
throw new InvocationTargetException(
@@ -142,4 +147,4 @@ public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
ReflectionUtils.handleReflectionException(e);
}
}
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.properties;
package org.springframework.cloud.stream.binder.kafka.config;
import java.util.HashMap;
import java.util.Map;

View File

@@ -16,36 +16,29 @@
package org.springframework.cloud.stream.binder.kafka.config;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderJaasInitializerListener;
import org.springframework.cloud.stream.binder.kafka.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.config.codec.kryo.KryoCodecAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
@@ -72,7 +65,7 @@ import org.springframework.util.ObjectUtils;
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaBinderConfiguration.KafkaPropertiesConfiguration.class})
@Import({KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class})
@EnableConfigurationProperties({KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class})
public class KafkaBinderConfiguration {
@@ -96,18 +89,14 @@ public class KafkaBinderConfiguration {
@Autowired (required = false)
private AdminUtilsOperation adminUtilsOperation;
@Bean
KafkaTopicProvisioner provisioningProvider() {
return new KafkaTopicProvisioner(this.configurationProperties, this.adminUtilsOperation);
}
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder() {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
this.configurationProperties, provisioningProvider());
this.configurationProperties);
kafkaMessageChannelBinder.setCodec(this.codec);
kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
kafkaMessageChannelBinder.setAdminUtilsOperation(adminUtilsOperation);
return kafkaMessageChannelBinder;
}
@@ -173,47 +162,4 @@ public class KafkaBinderConfiguration {
private JaasLoginModuleConfiguration zookeeper;
}
@ConditionalOnClass(name = "org.springframework.boot.autoconfigure.kafka.KafkaProperties")
public static class KafkaPropertiesConfiguration {
// KafkaProperties can still be unavailable if KafkaAutoConfiguration is disabled.
@Autowired(required = false)
private KafkaProperties kafkaProperties;
@Autowired
private KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties;
@PostConstruct
public void init() {
Map<String, Object> configuration = this.kafkaBinderConfigurationProperties.getConfiguration();
if (this.kafkaProperties != null) {
for (Map.Entry<String, String> properties : this.kafkaProperties.getProperties().entrySet()) {
if (!configuration.containsKey(properties.getKey())) {
configuration.put(properties.getKey(), properties.getValue());
}
}
for (Map.Entry<String, Object> producerProperties : this.kafkaProperties.buildProducerProperties().entrySet()) {
if (!configuration.containsKey(producerProperties.getKey())) {
configuration.put(producerProperties.getKey(), producerProperties.getValue());
}
}
for (Map.Entry<String, Object> consumerProperties : this.kafkaProperties.buildConsumerProperties().entrySet()) {
if (!configuration.containsKey(consumerProperties.getKey())) {
configuration.put(consumerProperties.getKey(), consumerProperties.getValue());
}
}
if (ObjectUtils.isEmpty(configuration.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBinderConfigurationProperties.getKafkaConnectionString());
}
else {
@SuppressWarnings("unchecked")
List<String> bootStrapServers = (List<String>) configuration.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
if (bootStrapServers.size() == 1 && bootStrapServers.get(0).equals("localhost:9092")) {
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBinderConfigurationProperties.getKafkaConnectionString());
}
}
}
}
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.properties;
package org.springframework.cloud.stream.binder.kafka.config;
import java.util.HashMap;
import java.util.Map;
@@ -33,7 +33,7 @@ public class KafkaBinderConfigurationProperties {
private String[] zkNodes = new String[] {"localhost"};
private Map<String, Object> configuration = new HashMap<>();
private Map<String, String> configuration = new HashMap<>();
private String defaultZkPort = "2181";
@@ -249,11 +249,11 @@ public class KafkaBinderConfigurationProperties {
this.socketBufferSize = socketBufferSize;
}
public Map<String, Object> getConfiguration() {
public Map<String, String> getConfiguration() {
return configuration;
}
public void setConfiguration(Map<String, Object> configuration) {
public void setConfiguration(Map<String, String> configuration) {
this.configuration = configuration;
}

View File

@@ -1,2 +0,0 @@
org.springframework.boot.env.EnvironmentPostProcessor=\
org.springframework.cloud.stream.binder.kafka.KafkaBinderEnvironmentPostProcessor

View File

@@ -23,8 +23,6 @@ import com.esotericsoftware.kryo.Registration;
import org.springframework.cloud.stream.binder.AbstractTestBinder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.integration.codec.Codec;
import org.springframework.integration.codec.kryo.KryoRegistrar;
import org.springframework.integration.codec.kryo.PojoCodec;

View File

@@ -34,12 +34,13 @@ import org.junit.ClassRule;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.retry.RetryOperations;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
@@ -49,7 +50,7 @@ import org.springframework.kafka.test.rule.KafkaEmbedded;
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
*/
public class Kafka_09_BinderTests extends KafkaBinderTests {
public class Kafka09BinderTests extends KafkaBinderTests {
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
@@ -92,6 +93,11 @@ public class Kafka_09_BinderTests extends KafkaBinderTests {
return consumerFactory().createConsumer().partitionsFor(topic).size();
}
@Override
protected void setMetadataRetryOperations(Binder binder, RetryOperations retryOperations) {
((Kafka09TestBinder) binder).getBinder().setMetadataRetryOperations(retryOperations);
}
@Override
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),

View File

@@ -16,10 +16,8 @@
package org.springframework.cloud.stream.binder.kafka;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
@@ -37,18 +35,14 @@ public class Kafka09TestBinder extends AbstractKafkaTestBinder {
public Kafka09TestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
try {
AdminUtilsOperation adminUtilsOperation = new Kafka09AdminUtilsOperation();
KafkaTopicProvisioner provisioningProvider =
new KafkaTopicProvisioner(binderConfiguration, adminUtilsOperation);
provisioningProvider.afterPropertiesSet();
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration, provisioningProvider);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration);
binder.setCodec(getCodec());
ProducerListener producerListener = new LoggingProducerListener();
binder.setProducerListener(producerListener);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.setAdminUtilsOperation(new Kafka09AdminUtilsOperation());
binder.afterPropertiesSet();
this.setBinder(binder);
}

View File

@@ -1,95 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ReflectionUtils;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* @author Ilayaperumal Gopinathan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {KafkaBinderAutoConfigurationPropertiesTest.KafkaBinderConfigProperties.class, KafkaBinderConfiguration.class})
@TestPropertySource(locations = "classpath:binder-config-autoconfig.properties")
public class KafkaBinderAutoConfigurationPropertiesTest {
@Autowired
private KafkaMessageChannelBinder kafkaMessageChannelBinder;
@Test
public void testKafkaBinderConfigurationWithKafkaProperties() throws Exception {
assertNotNull(this.kafkaMessageChannelBinder);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(new KafkaProducerProperties());
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory", ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod.invoke(this.kafkaMessageChannelBinder, producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField, producerFactory);
assertTrue(producerConfigs.get("batch.size").equals(10));
assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class));
assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class));
assertTrue(producerConfigs.get("compression.type").equals("snappy"));
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9092");
bootstrapServers.add("10.98.09.196:9092");
assertTrue((((List<String>) producerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
createKafkaConsumerFactoryMethod.setAccessible(true);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField, consumerFactory);
assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class));
assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class));
assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
}
public static class KafkaBinderConfigProperties {
@Bean
KafkaProperties kafkaProperties() {
return new KafkaProperties();
}
}
}

View File

@@ -1,89 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ReflectionUtils;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* @author Ilayaperumal Gopinathan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {KafkaBinderConfiguration.class})
@TestPropertySource(locations = "classpath:binder-config.properties")
public class KafkaBinderConfigurationPropertiesTest {
@Autowired
private KafkaMessageChannelBinder kafkaMessageChannelBinder;
@Test
public void testKafkaBinderConfigurationProperties() throws Exception {
assertNotNull(this.kafkaMessageChannelBinder);
KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties();
kafkaProducerProperties.setBufferSize(12345);
kafkaProducerProperties.setBatchTimeout(100);
kafkaProducerProperties.setCompressionType(KafkaProducerProperties.CompressionType.gzip);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(kafkaProducerProperties);
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory", ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod.invoke(this.kafkaMessageChannelBinder, producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField, producerFactory);
assertTrue(producerConfigs.get("batch.size").equals("12345"));
assertTrue(producerConfigs.get("linger.ms").equals("100"));
assertTrue(producerConfigs.get("key.serializer").equals(ByteArraySerializer.class));
assertTrue(producerConfigs.get("value.serializer").equals(ByteArraySerializer.class));
assertTrue(producerConfigs.get("compression.type").equals("gzip"));
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9082");
assertTrue((((String) producerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
createKafkaConsumerFactoryMethod.setAccessible(true);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField, consumerFactory);
assertTrue(consumerConfigs.get("key.deserializer").equals(ByteArrayDeserializer.class));
assertTrue(consumerConfigs.get("value.deserializer").equals(ByteArrayDeserializer.class));
assertTrue((((String) consumerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,6 +15,8 @@
*/
package org.springframework.cloud.stream.binder.kafka;
import static org.junit.Assert.assertNotNull;
import java.lang.reflect.Field;
import org.junit.Test;
@@ -27,8 +29,6 @@ import org.springframework.kafka.support.ProducerListener;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ReflectionUtils;
import static org.junit.Assert.assertNotNull;
/**
* @author Ilayaperumal Gopinathan
*/

View File

@@ -19,7 +19,6 @@ package org.springframework.cloud.stream.binder.kafka;
import javax.security.auth.login.AppConfigurationEntry;
import com.sun.security.auth.login.ConfigFile;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;

View File

@@ -38,20 +38,16 @@ import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
import org.springframework.cloud.stream.binder.PartitionTestSupport;
import org.springframework.cloud.stream.binder.TestUtils;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
@@ -70,6 +66,7 @@ import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@@ -110,6 +107,8 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
protected abstract int partitionSize(String topic);
protected abstract void setMetadataRetryOperations(Binder binder, RetryOperations retryOperations);
protected abstract ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties);
protected abstract void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
@@ -285,6 +284,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
String testTopicName = "nonexisting" + System.currentTimeMillis();
@@ -1013,6 +1013,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
String testTopicName = "createdByBroker-" + System.currentTimeMillis();
@@ -1068,6 +1069,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<?> binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
@@ -1104,7 +1106,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
}
catch (Exception e) {
assertThat(e).isInstanceOf(ProvisioningException.class);
assertThat(e).isInstanceOf(BinderException.class);
assertThat(e)
.hasMessageContaining("The number of expected partitions was: 3, but 1 has been found instead");
}
@@ -1137,6 +1139,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
// this consumer must consume from partition 2
@@ -1185,6 +1188,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<?> binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
@@ -1224,7 +1228,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
Binding<?> binding = null;
try {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
Map<String, Object> propertiesToOverride = configurationProperties.getConfiguration();
Map<String, String> propertiesToOverride = configurationProperties.getConfiguration();
propertiesToOverride.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
propertiesToOverride.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
configurationProperties.setConfiguration(propertiesToOverride);
@@ -1346,221 +1350,6 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
}
}
@Test
@SuppressWarnings("unchecked")
public void testPartitionedModuleJavaWithRawMode() throws Exception {
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.setHeaderMode(HeaderMode.raw);
properties.setPartitionKeyExtractorClass(RawKafkaPartitionTestSupport.class);
properties.setPartitionSelectorClass(RawKafkaPartitionTestSupport.class);
properties.setPartitionCount(6);
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(properties));
output.setBeanName("test.output");
Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output, properties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setConcurrency(2);
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(0);
consumerProperties.setPartitioned(true);
consumerProperties.setHeaderMode(HeaderMode.raw);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
QueueChannel input0 = new QueueChannel();
input0.setBeanName("test.input0J");
Binding<MessageChannel> input0Binding = binder.bindConsumer("partJ.0", "test", input0, consumerProperties);
consumerProperties.setInstanceIndex(1);
QueueChannel input1 = new QueueChannel();
input1.setBeanName("test.input1J");
Binding<MessageChannel> input1Binding = binder.bindConsumer("partJ.0", "test", input1, consumerProperties);
consumerProperties.setInstanceIndex(2);
QueueChannel input2 = new QueueChannel();
input2.setBeanName("test.input2J");
Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.0", "test", input2, consumerProperties);
output.send(new GenericMessage<>(new byte[] {(byte) 0}));
output.send(new GenericMessage<>(new byte[] {(byte) 1}));
output.send(new GenericMessage<>(new byte[] {(byte) 2}));
Message<?> receive0 = receive(input0);
assertThat(receive0).isNotNull();
Message<?> receive1 = receive(input1);
assertThat(receive1).isNotNull();
Message<?> receive2 = receive(input2);
assertThat(receive2).isNotNull();
assertThat(Arrays.asList(((byte[]) receive0.getPayload())[0], ((byte[]) receive1.getPayload())[0],
((byte[]) receive2.getPayload())[0])).containsExactlyInAnyOrder((byte) 0, (byte) 1, (byte) 2);
input0Binding.unbind();
input1Binding.unbind();
input2Binding.unbind();
outputBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testPartitionedModuleSpELWithRawMode() throws Exception {
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload[0]"));
properties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
properties.setPartitionCount(6);
properties.setHeaderMode(HeaderMode.raw);
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(properties));
output.setBeanName("test.output");
Binding<MessageChannel> outputBinding = binder.bindProducer("part.0", output, properties);
try {
Object endpoint = extractEndpoint(outputBinding);
assertThat(getEndpointRouting(endpoint))
.contains(getExpectedRoutingBaseDestination("part.0", "test") + "-' + headers['partition']");
}
catch (UnsupportedOperationException ignored) {
}
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setConcurrency(2);
consumerProperties.setInstanceIndex(0);
consumerProperties.setInstanceCount(3);
consumerProperties.setPartitioned(true);
consumerProperties.setHeaderMode(HeaderMode.raw);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
QueueChannel input0 = new QueueChannel();
input0.setBeanName("test.input0S");
Binding<MessageChannel> input0Binding = binder.bindConsumer("part.0", "test", input0, consumerProperties);
consumerProperties.setInstanceIndex(1);
QueueChannel input1 = new QueueChannel();
input1.setBeanName("test.input1S");
Binding<MessageChannel> input1Binding = binder.bindConsumer("part.0", "test", input1, consumerProperties);
consumerProperties.setInstanceIndex(2);
QueueChannel input2 = new QueueChannel();
input2.setBeanName("test.input2S");
Binding<MessageChannel> input2Binding = binder.bindConsumer("part.0", "test", input2, consumerProperties);
Message<byte[]> message2 = org.springframework.integration.support.MessageBuilder.withPayload(new byte[] {2})
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "kafkaBinderTestCommonsDelegate")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 42)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 43).build();
output.send(message2);
output.send(new GenericMessage<>(new byte[] {1}));
output.send(new GenericMessage<>(new byte[] {0}));
Message<?> receive0 = receive(input0);
assertThat(receive0).isNotNull();
Message<?> receive1 = receive(input1);
assertThat(receive1).isNotNull();
Message<?> receive2 = receive(input2);
assertThat(receive2).isNotNull();
assertThat(Arrays.asList(((byte[]) receive0.getPayload())[0], ((byte[]) receive1.getPayload())[0],
((byte[]) receive2.getPayload())[0])).containsExactlyInAnyOrder((byte) 0, (byte) 1, (byte) 2);
input0Binding.unbind();
input1Binding.unbind();
input2Binding.unbind();
outputBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testSendAndReceiveWithRawMode() throws Exception {
Binder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setHeaderMode(HeaderMode.raw);
Binding<MessageChannel> producerBinding = binder.bindProducer("0", moduleOutputChannel,
producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setHeaderMode(HeaderMode.raw);
Binding<MessageChannel> consumerBinding = binder.bindConsumer("0", "test", moduleInputChannel,
consumerProperties);
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("kafkaBinderTestCommonsDelegate".getBytes()).build();
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
assertThat(inbound).isNotNull();
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("kafkaBinderTestCommonsDelegate");
producerBinding.unbind();
consumerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testSendAndReceiveWithExplicitConsumerGroupWithRawMode() throws Exception {
Binder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
// Test pub/sub by emulating how StreamPlugin handles taps
QueueChannel module1InputChannel = new QueueChannel();
QueueChannel module2InputChannel = new QueueChannel();
QueueChannel module3InputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setHeaderMode(HeaderMode.raw);
Binding<MessageChannel> producerBinding = binder.bindProducer("baz.0", moduleOutputChannel,
producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setHeaderMode(HeaderMode.raw);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
Binding<MessageChannel> input1Binding = binder.bindConsumer("baz.0", "test", module1InputChannel,
consumerProperties);
// A new module is using the tap as an input channel
String fooTapName = "baz.0";
Binding<MessageChannel> input2Binding = binder.bindConsumer(fooTapName, "tap1", module2InputChannel,
consumerProperties);
// Another new module is using tap as an input channel
String barTapName = "baz.0";
Binding<MessageChannel> input3Binding = binder.bindConsumer(barTapName, "tap2", module3InputChannel,
consumerProperties);
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("kafkaBinderTestCommonsDelegate".getBytes()).build();
boolean success = false;
boolean retried = false;
while (!success) {
moduleOutputChannel.send(message);
Message<?> inbound = receive(module1InputChannel);
assertThat(inbound).isNotNull();
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("kafkaBinderTestCommonsDelegate");
Message<?> tapped1 = receive(module2InputChannel);
Message<?> tapped2 = receive(module3InputChannel);
if (tapped1 == null || tapped2 == null) {
// listener may not have started
assertThat(retried).isFalse().withFailMessage("Failed to receive tap after retry");
retried = true;
continue;
}
success = true;
assertThat(new String((byte[]) tapped1.getPayload())).isEqualTo("kafkaBinderTestCommonsDelegate");
assertThat(new String((byte[]) tapped2.getPayload())).isEqualTo("kafkaBinderTestCommonsDelegate");
}
// delete one tap stream is deleted
input3Binding.unbind();
Message<?> message2 = org.springframework.integration.support.MessageBuilder.withPayload("bar".getBytes()).build();
moduleOutputChannel.send(message2);
// other tap still receives messages
Message<?> tapped = receive(module2InputChannel);
assertThat(tapped).isNotNull();
// removed tap does not
assertThat(receive(module3InputChannel)).isNull();
// re-subscribed tap does receive the message
input3Binding = binder.bindConsumer(barTapName, "tap2", module3InputChannel, createConsumerProperties());
assertThat(receive(module3InputChannel)).isNotNull();
// clean up
input1Binding.unbind();
input2Binding.unbind();
input3Binding.unbind();
producerBinding.unbind();
assertThat(extractEndpoint(input1Binding).isRunning()).isFalse();
assertThat(extractEndpoint(input2Binding).isRunning()).isFalse();
assertThat(extractEndpoint(input3Binding).isRunning()).isFalse();
assertThat(extractEndpoint(producerBinding).isRunning()).isFalse();
}
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
Thread.sleep(500);

View File

@@ -0,0 +1,258 @@
/*
* Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.Arrays;
import org.junit.Test;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
* @author David Turanski
* @author Gary Russell
* @author Mark Fisher
*/
public class RawModeKafka09BinderTests extends Kafka09BinderTests {
@Test
@Override
public void testPartitionedModuleJava() throws Exception {
Kafka09TestBinder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.setHeaderMode(HeaderMode.raw);
properties.setPartitionKeyExtractorClass(RawKafkaPartitionTestSupport.class);
properties.setPartitionSelectorClass(RawKafkaPartitionTestSupport.class);
properties.setPartitionCount(6);
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(properties));
output.setBeanName("test.output");
Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output, properties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setConcurrency(2);
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(0);
consumerProperties.setPartitioned(true);
consumerProperties.setHeaderMode(HeaderMode.raw);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
QueueChannel input0 = new QueueChannel();
input0.setBeanName("test.input0J");
Binding<MessageChannel> input0Binding = binder.bindConsumer("partJ.0", "test", input0, consumerProperties);
consumerProperties.setInstanceIndex(1);
QueueChannel input1 = new QueueChannel();
input1.setBeanName("test.input1J");
Binding<MessageChannel> input1Binding = binder.bindConsumer("partJ.0", "test", input1, consumerProperties);
consumerProperties.setInstanceIndex(2);
QueueChannel input2 = new QueueChannel();
input2.setBeanName("test.input2J");
Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.0", "test", input2, consumerProperties);
output.send(new GenericMessage<>(new byte[] {(byte) 0}));
output.send(new GenericMessage<>(new byte[] {(byte) 1}));
output.send(new GenericMessage<>(new byte[] {(byte) 2}));
Message<?> receive0 = receive(input0);
assertThat(receive0).isNotNull();
Message<?> receive1 = receive(input1);
assertThat(receive1).isNotNull();
Message<?> receive2 = receive(input2);
assertThat(receive2).isNotNull();
assertThat(Arrays.asList(((byte[]) receive0.getPayload())[0], ((byte[]) receive1.getPayload())[0],
((byte[]) receive2.getPayload())[0])).containsExactlyInAnyOrder((byte) 0, (byte) 1, (byte) 2);
input0Binding.unbind();
input1Binding.unbind();
input2Binding.unbind();
outputBinding.unbind();
}
@Test
@Override
public void testPartitionedModuleSpEL() throws Exception {
Kafka09TestBinder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload[0]"));
properties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
properties.setPartitionCount(6);
properties.setHeaderMode(HeaderMode.raw);
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(properties));
output.setBeanName("test.output");
Binding<MessageChannel> outputBinding = binder.bindProducer("part.0", output, properties);
try {
Object endpoint = extractEndpoint(outputBinding);
assertThat(getEndpointRouting(endpoint))
.contains(getExpectedRoutingBaseDestination("part.0", "test") + "-' + headers['partition']");
}
catch (UnsupportedOperationException ignored) {
}
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setConcurrency(2);
consumerProperties.setInstanceIndex(0);
consumerProperties.setInstanceCount(3);
consumerProperties.setPartitioned(true);
consumerProperties.setHeaderMode(HeaderMode.raw);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
QueueChannel input0 = new QueueChannel();
input0.setBeanName("test.input0S");
Binding<MessageChannel> input0Binding = binder.bindConsumer("part.0", "test", input0, consumerProperties);
consumerProperties.setInstanceIndex(1);
QueueChannel input1 = new QueueChannel();
input1.setBeanName("test.input1S");
Binding<MessageChannel> input1Binding = binder.bindConsumer("part.0", "test", input1, consumerProperties);
consumerProperties.setInstanceIndex(2);
QueueChannel input2 = new QueueChannel();
input2.setBeanName("test.input2S");
Binding<MessageChannel> input2Binding = binder.bindConsumer("part.0", "test", input2, consumerProperties);
Message<byte[]> message2 = MessageBuilder.withPayload(new byte[] {2})
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "foo")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 42)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 43).build();
output.send(message2);
output.send(new GenericMessage<>(new byte[] {1}));
output.send(new GenericMessage<>(new byte[] {0}));
Message<?> receive0 = receive(input0);
assertThat(receive0).isNotNull();
Message<?> receive1 = receive(input1);
assertThat(receive1).isNotNull();
Message<?> receive2 = receive(input2);
assertThat(receive2).isNotNull();
assertThat(Arrays.asList(((byte[]) receive0.getPayload())[0], ((byte[]) receive1.getPayload())[0],
((byte[]) receive2.getPayload())[0])).containsExactlyInAnyOrder((byte) 0, (byte) 1, (byte) 2);
input0Binding.unbind();
input1Binding.unbind();
input2Binding.unbind();
outputBinding.unbind();
}
@Test
@Override
public void testSendAndReceive() throws Exception {
Kafka09TestBinder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setHeaderMode(HeaderMode.raw);
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.0", moduleOutputChannel,
producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setHeaderMode(HeaderMode.raw);
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.0", "test", moduleInputChannel,
consumerProperties);
Message<?> message = MessageBuilder.withPayload("foo".getBytes()).build();
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
assertThat(inbound).isNotNull();
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("foo");
producerBinding.unbind();
consumerBinding.unbind();
}
@Test
public void testSendAndReceiveWithExplicitConsumerGroup() {
Kafka09TestBinder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
// Test pub/sub by emulating how StreamPlugin handles taps
QueueChannel module1InputChannel = new QueueChannel();
QueueChannel module2InputChannel = new QueueChannel();
QueueChannel module3InputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setHeaderMode(HeaderMode.raw);
Binding<MessageChannel> producerBinding = binder.bindProducer("baz.0", moduleOutputChannel,
producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setHeaderMode(HeaderMode.raw);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
Binding<MessageChannel> input1Binding = binder.bindConsumer("baz.0", "test", module1InputChannel,
consumerProperties);
// A new module is using the tap as an input channel
String fooTapName = "baz.0";
Binding<MessageChannel> input2Binding = binder.bindConsumer(fooTapName, "tap1", module2InputChannel,
consumerProperties);
// Another new module is using tap as an input channel
String barTapName = "baz.0";
Binding<MessageChannel> input3Binding = binder.bindConsumer(barTapName, "tap2", module3InputChannel,
consumerProperties);
Message<?> message = MessageBuilder.withPayload("foo".getBytes()).build();
boolean success = false;
boolean retried = false;
while (!success) {
moduleOutputChannel.send(message);
Message<?> inbound = receive(module1InputChannel);
assertThat(inbound).isNotNull();
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("foo");
Message<?> tapped1 = receive(module2InputChannel);
Message<?> tapped2 = receive(module3InputChannel);
if (tapped1 == null || tapped2 == null) {
// listener may not have started
assertThat(retried).isFalse().withFailMessage("Failed to receive tap after retry");
retried = true;
continue;
}
success = true;
assertThat(new String((byte[]) tapped1.getPayload())).isEqualTo("foo");
assertThat(new String((byte[]) tapped2.getPayload())).isEqualTo("foo");
}
// delete one tap stream is deleted
input3Binding.unbind();
Message<?> message2 = MessageBuilder.withPayload("bar".getBytes()).build();
moduleOutputChannel.send(message2);
// other tap still receives messages
Message<?> tapped = receive(module2InputChannel);
assertThat(tapped).isNotNull();
// removed tap does not
assertThat(receive(module3InputChannel)).isNull();
// re-subscribed tap does receive the message
input3Binding = binder.bindConsumer(barTapName, "tap2", module3InputChannel, createConsumerProperties());
assertThat(receive(module3InputChannel)).isNotNull();
// clean up
input1Binding.unbind();
input2Binding.unbind();
input3Binding.unbind();
producerBinding.unbind();
assertThat(extractEndpoint(input1Binding).isRunning()).isFalse();
assertThat(extractEndpoint(input2Binding).isRunning()).isFalse();
assertThat(extractEndpoint(input3Binding).isRunning()).isFalse();
assertThat(extractEndpoint(producerBinding).isRunning()).isFalse();
}
}

View File

@@ -1,47 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.bootstrap;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.test.rule.KafkaEmbedded;
/**
* @author Marius Bogoevici
*/
public class KafkaBinderBootstrapTest {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
@Test
public void testKafkaBinderConfiguration() throws Exception {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
.web(false)
.run("--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
applicationContext.close();
}
@SpringBootApplication
static class SimpleApplication {
}
}

View File

@@ -1,7 +0,0 @@
spring.kafka.producer.keySerializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.valueSerializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.consumer.keyDeserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.valueDeserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.producer.batchSize=10
spring.kafka.bootstrapServers=10.98.09.199:9092,10.98.09.196:9092
spring.kafka.producer.compressionType=snappy

View File

@@ -1 +0,0 @@
spring.cloud.stream.kafka.binder.brokers=10.98.09.199:9082