Compare commits
23 Commits
v1.2.0.REL
...
v2.0.0.M1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9471976ddd | ||
|
|
4c4873b3c1 | ||
|
|
971bc96962 | ||
|
|
47a225d02a | ||
|
|
143b96f79d | ||
|
|
a0f386f06f | ||
|
|
9ad04882c8 | ||
|
|
705213efe8 | ||
|
|
7355ada461 | ||
|
|
d4aaf78089 | ||
|
|
e440378e44 | ||
|
|
53e38902c9 | ||
|
|
a7a0a132ea | ||
|
|
ee4f3935ec | ||
|
|
4e0107b4d2 | ||
|
|
8f8a1e8709 | ||
|
|
c68ea8c570 | ||
|
|
61e7936978 | ||
|
|
5c70e2df43 | ||
|
|
f280edc9ce | ||
|
|
2c9afde8c6 | ||
|
|
6a8c0cd0c6 | ||
|
|
ec73f2785d |
23
pom.xml
23
pom.xml
@@ -2,29 +2,27 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.RELEASE</version>
|
||||
<version>2.0.0.M1</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>1.3.1.RELEASE</version>
|
||||
<version>2.0.0.M1</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.7</java.version>
|
||||
<kafka.version>0.10.1.1</kafka.version>
|
||||
<spring-kafka.version>1.1.2.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.1.0.RELEASE</spring-integration-kafka.version>
|
||||
<spring-cloud-stream.version>1.2.0.RELEASE</spring-cloud-stream.version>
|
||||
<kafka.version>0.10.2.0</kafka.version>
|
||||
<spring-kafka.version>2.0.0.M2</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.0.0.M1</spring-integration-kafka.version>
|
||||
<spring-cloud-stream.version>2.0.0.M1</spring-cloud-stream.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
<module>spring-cloud-starter-stream-kafka</module>
|
||||
<module>spring-cloud-stream-binder-kafka-docs</module>
|
||||
<module>spring-cloud-stream-binder-kafka-0.9-test</module>
|
||||
<module>spring-cloud-stream-binder-kafka-0.10.0-test</module>
|
||||
<module>spring-cloud-stream-binder-kafka-0.10.1-test</module>
|
||||
<module>spring-cloud-stream-binder-kafka-core</module>
|
||||
<module>spring-cloud-stream-binder-kafka-test-support</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
@@ -63,11 +61,6 @@
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
@@ -138,7 +131,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build-tools</artifactId>
|
||||
<version>1.3.1.RELEASE</version>
|
||||
<version>2.0.0.M1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<executions>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.RELEASE</version>
|
||||
<version>2.0.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -1,108 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-0.10.0-test</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder 0.10.0 Tests</description>
|
||||
<url>http://projects.spring.io/spring-cloud</url>
|
||||
<organization>
|
||||
<name>Pivotal Software, Inc.</name>
|
||||
<url>http://www.spring.io</url>
|
||||
</organization>
|
||||
<properties>
|
||||
<main.basedir>${basedir}/../..</main.basedir>
|
||||
<!--
|
||||
Override Kafka dependencies to Kafka 0.9.0.1 and supporting Spring Kafka and
|
||||
Spring Integration Kafka versions
|
||||
-->
|
||||
<kafka.version>0.10.0.1</kafka.version>
|
||||
<spring-kafka.version>1.1.2.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.1.0.RELEASE</spring-integration-kafka.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<version>1.2.0.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-schema</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-avro-serializer</artifactId>
|
||||
<version>3.0.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-schema-registry</artifactId>
|
||||
<version>3.0.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>confluent</id>
|
||||
<url>http://packages.confluent.io/maven/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
</project>
|
||||
@@ -1,28 +0,0 @@
|
||||
/*
|
||||
* Copyright 2014-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
/**
|
||||
* Integration tests for the {@link KafkaMessageChannelBinder}.
|
||||
*
|
||||
* This test specifically tests for the 0.10.0.1 version of Kafka.
|
||||
*
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class Kafka_0_10_0_BinderTests extends Kafka10BinderTests {
|
||||
|
||||
}
|
||||
120
spring-cloud-stream-binder-kafka-0.10.1-test/pom.xml
Normal file
120
spring-cloud-stream-binder-kafka-0.10.1-test/pom.xml
Normal file
@@ -0,0 +1,120 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-0.10.1-test</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder 0.10.1 Tests</description>
|
||||
<url>http://projects.spring.io/spring-cloud</url>
|
||||
<organization>
|
||||
<name>Pivotal Software, Inc.</name>
|
||||
<url>http://www.spring.io</url>
|
||||
</organization>
|
||||
<properties>
|
||||
<main.basedir>${basedir}/../..</main.basedir>
|
||||
<kafka.version>0.10.1.1</kafka.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<version>1.1.6.RELEASE</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-schema</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-avro-serializer</artifactId>
|
||||
<version>3.1.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-schema-registry</artifactId>
|
||||
<version>3.1.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>confluent</id>
|
||||
<url>http://packages.confluent.io/maven/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>3.0.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@@ -43,7 +43,7 @@ public class Kafka10TestBinder extends AbstractKafkaTestBinder {
|
||||
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration, provisioningProvider);
|
||||
|
||||
binder.setCodec(getCodec());
|
||||
binder.setCodec(AbstractKafkaTestBinder.getCodec());
|
||||
ProducerListener producerListener = new LoggingProducerListener();
|
||||
binder.setProducerListener(producerListener);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
@@ -58,8 +58,6 @@ import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
@@ -70,7 +68,7 @@ import static org.junit.Assert.assertTrue;
|
||||
* @author Mark Fisher
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class Kafka10BinderTests extends KafkaBinderTests {
|
||||
public class Kafka_0_10_1_BinderTests extends KafkaBinderTests {
|
||||
|
||||
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
|
||||
|
||||
@@ -201,7 +199,7 @@ public class Kafka10BinderTests extends KafkaBinderTests {
|
||||
break;
|
||||
}
|
||||
else if (System.currentTimeMillis() > endTime) {
|
||||
fail("Kafka Schema Registry Server failed to start");
|
||||
Assertions.fail("Kafka Schema Registry Server failed to start");
|
||||
}
|
||||
}
|
||||
User1 firstOutboundFoo = new User1();
|
||||
@@ -230,7 +228,7 @@ public class Kafka10BinderTests extends KafkaBinderTests {
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
Assertions.assertThat(inbound).isNotNull();
|
||||
assertTrue(message.getPayload() instanceof User1);
|
||||
User1 receivedUser = (User1) message.getPayload();
|
||||
Assertions.assertThat(receivedUser.getName()).isEqualTo(userName1);
|
||||
@@ -1,78 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-0.9</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder 0.9 Tests</description>
|
||||
<url>http://projects.spring.io/spring-cloud</url>
|
||||
<organization>
|
||||
<name>Pivotal Software, Inc.</name>
|
||||
<url>http://www.spring.io</url>
|
||||
</organization>
|
||||
<properties>
|
||||
<main.basedir>${basedir}/../..</main.basedir>
|
||||
<!--
|
||||
Override Kafka dependencies to Kafka 0.9.0.1 and supporting Spring Kafka and
|
||||
Spring Integration Kafka versions
|
||||
-->
|
||||
<kafka.version>0.9.0.1</kafka.version>
|
||||
<spring-kafka.version>1.0.5.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.0.1.RELEASE</spring-integration-kafka.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -1,60 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.kafka.support.LoggingProducerListener;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
|
||||
/**
|
||||
* Test support class for {@link KafkaMessageChannelBinder}. Creates a binder that uses
|
||||
* an embedded Kafka cluster.
|
||||
* @author Eric Bottard
|
||||
* @author Marius Bogoevici
|
||||
* @author David Turanski
|
||||
* @author Gary Russell
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class Kafka09TestBinder extends AbstractKafkaTestBinder {
|
||||
|
||||
public Kafka09TestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
|
||||
try {
|
||||
AdminUtilsOperation adminUtilsOperation = new Kafka09AdminUtilsOperation();
|
||||
KafkaTopicProvisioner provisioningProvider =
|
||||
new KafkaTopicProvisioner(binderConfiguration, adminUtilsOperation);
|
||||
provisioningProvider.afterPropertiesSet();
|
||||
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration, provisioningProvider);
|
||||
binder.setCodec(getCodec());
|
||||
ProducerListener producerListener = new LoggingProducerListener();
|
||||
binder.setProducerListener(producerListener);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
binder.afterPropertiesSet();
|
||||
this.setBinder(binder);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,158 +0,0 @@
|
||||
/*
|
||||
* Copyright 2014-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.Spy;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.test.core.BrokerAddress;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
|
||||
/**
|
||||
* Integration tests for the {@link KafkaMessageChannelBinder}.
|
||||
*
|
||||
* @author Eric Bottard
|
||||
* @author Marius Bogoevici
|
||||
* @author Mark Fisher
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class Kafka_09_BinderTests extends KafkaBinderTests {
|
||||
|
||||
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
|
||||
|
||||
private Kafka09TestBinder binder;
|
||||
|
||||
private Kafka09AdminUtilsOperation adminUtilsOperation = new Kafka09AdminUtilsOperation();
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Kafka09TestBinder getBinder() {
|
||||
if (binder == null) {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binder = new Kafka09TestBinder(binderConfiguration);
|
||||
}
|
||||
return binder;
|
||||
}
|
||||
|
||||
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
|
||||
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
|
||||
List<String> bAddresses = new ArrayList<>();
|
||||
for (BrokerAddress bAddress : brokerAddresses) {
|
||||
bAddresses.add(bAddress.toString());
|
||||
}
|
||||
String[] foo = new String[bAddresses.size()];
|
||||
binderConfiguration.setBrokers(bAddresses.toArray(foo));
|
||||
binderConfiguration.setZkNodes(embeddedKafka.getZookeeperConnectionString());
|
||||
return binderConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int partitionSize(String topic) {
|
||||
return consumerFactory().createConsumer().partitionsFor(topic).size();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
|
||||
kafkaBinderConfigurationProperties.getZkSessionTimeout(), kafkaBinderConfigurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
return new ZkUtils(zkClient, null, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Properties topicConfig) {
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topic, partitions, replicationFactor, new Properties());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int invokePartitionSize(String topic, ZkUtils zkUtils) {
|
||||
return adminUtilsOperation.partitionSize(topic, zkUtils);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKafkaOffsetHeaderKey() {
|
||||
return KafkaHeaders.OFFSET;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
return new Kafka09TestBinder(kafkaBinderConfigurationProperties);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
|
||||
if (multiplier != null) {
|
||||
timeoutMultiplier = Double.parseDouble(multiplier);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean usesExplicitRouting() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getClassUnderTestName() {
|
||||
return CLASS_UNDER_TEST_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Spy spyOn(final String name) {
|
||||
throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
|
||||
}
|
||||
|
||||
private ConsumerFactory<byte[], byte[]> consumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
|
||||
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
|
||||
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
|
||||
|
||||
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.RELEASE</version>
|
||||
<version>2.0.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,6 +16,8 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import org.springframework.expression.Expression;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@@ -23,6 +25,7 @@ import javax.validation.constraints.NotNull;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
* @author Henryk Konsek
|
||||
*/
|
||||
public class KafkaProducerProperties {
|
||||
|
||||
@@ -34,6 +37,8 @@ public class KafkaProducerProperties {
|
||||
|
||||
private int batchTimeout;
|
||||
|
||||
private Expression messageKeyExpression;
|
||||
|
||||
private Map<String, String> configuration = new HashMap<>();
|
||||
|
||||
public int getBufferSize() {
|
||||
@@ -69,6 +74,14 @@ public class KafkaProducerProperties {
|
||||
this.batchTimeout = batchTimeout;
|
||||
}
|
||||
|
||||
public Expression getMessageKeyExpression() {
|
||||
return messageKeyExpression;
|
||||
}
|
||||
|
||||
public void setMessageKeyExpression(Expression messageKeyExpression) {
|
||||
this.messageKeyExpression = messageKeyExpression;
|
||||
}
|
||||
|
||||
public Map<String, String> getConfiguration() {
|
||||
return this.configuration;
|
||||
}
|
||||
|
||||
@@ -20,6 +20,8 @@ import java.util.Collection;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import kafka.common.ErrorMapping;
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
@@ -47,15 +49,13 @@ import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import kafka.common.ErrorMapping;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
/**
|
||||
* Kafka implementation for {@link ProvisioningProvider}
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @author Gary Russell
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Simon Flandergan
|
||||
*/
|
||||
public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>>, InitializingBean {
|
||||
@@ -75,7 +75,6 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param metadataRetryOperations the retry configuration
|
||||
*/
|
||||
public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
|
||||
@@ -106,7 +105,7 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
this.logger.info("Using kafka topic for outbound: " + name);
|
||||
}
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, properties.getPartitionCount());
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, properties.getPartitionCount(), false);
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
|
||||
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
|
||||
this.configurationProperties.getZkSessionTimeout(),
|
||||
@@ -130,7 +129,7 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
throw new IllegalArgumentException("Instance count cannot be zero");
|
||||
}
|
||||
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, partitionCount);
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, partitionCount, properties.getExtension().isAutoRebalanceEnabled());
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
|
||||
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
|
||||
this.configurationProperties.getZkSessionTimeout(),
|
||||
@@ -140,7 +139,7 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
if (properties.getExtension().isEnableDlq() && !anonymous) {
|
||||
String dlqTopic = StringUtils.hasText(properties.getExtension().getDlqName()) ?
|
||||
properties.getExtension().getDlqName() : "error." + name + "." + group;
|
||||
createTopicAndPartitions(dlqTopic, partitions);
|
||||
createTopicAndPartitions(dlqTopic, partitions, properties.getExtension().isAutoRebalanceEnabled());
|
||||
return new KafkaConsumerDestination(name, partitions, dlqTopic);
|
||||
}
|
||||
return new KafkaConsumerDestination(name, partitions);
|
||||
@@ -148,9 +147,10 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
return new KafkaConsumerDestination(name);
|
||||
}
|
||||
|
||||
private void createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(final String topicName, final int partitionCount) {
|
||||
private void createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(final String topicName, final int partitionCount,
|
||||
boolean tolerateLowerPartitionsOnBroker) {
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
|
||||
createTopicAndPartitions(topicName, partitionCount);
|
||||
createTopicAndPartitions(topicName, partitionCount, tolerateLowerPartitionsOnBroker);
|
||||
}
|
||||
else if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation == null) {
|
||||
this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. " +
|
||||
@@ -165,7 +165,9 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
* Creates a Kafka topic if needed, or try to increase its partition count to the
|
||||
* desired number.
|
||||
*/
|
||||
private void createTopicAndPartitions(final String topicName, final int partitionCount) {
|
||||
private void createTopicAndPartitions(final String topicName, final int partitionCount,
|
||||
boolean tolerateLowerPartitionsOnBroker) {
|
||||
|
||||
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
|
||||
this.configurationProperties.getZkSessionTimeout(),
|
||||
this.configurationProperties.getZkConnectionTimeout(),
|
||||
@@ -183,6 +185,11 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
if (this.configurationProperties.isAutoAddPartitions()) {
|
||||
adminUtilsOperation.invokeAddPartitions(zkUtils, topicName, effectivePartitionCount, null, false);
|
||||
}
|
||||
else if (tolerateLowerPartitionsOnBroker) {
|
||||
logger.warn("The number of expected partitions was: " + partitionCount + ", but "
|
||||
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
|
||||
+ "There will be " + (effectivePartitionCount - partitionSize) + " idle consumers");
|
||||
}
|
||||
else {
|
||||
throw new ProvisioningException("The number of expected partitions was: " + partitionCount + ", but "
|
||||
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
|
||||
@@ -231,7 +238,9 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<PartitionInfo> getPartitionsForTopic(final int partitionCount, final Callable<Collection<PartitionInfo>> callable) {
|
||||
public Collection<PartitionInfo> getPartitionsForTopic(final int partitionCount,
|
||||
final boolean tolerateLowerPartitionsOnBroker,
|
||||
final Callable<Collection<PartitionInfo>> callable) {
|
||||
try {
|
||||
return this.metadataRetryOperations
|
||||
.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() {
|
||||
@@ -240,10 +249,18 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
|
||||
Collection<PartitionInfo> partitions = callable.call();
|
||||
// do a sanity check on the partition set
|
||||
if (partitions.size() < partitionCount) {
|
||||
throw new IllegalStateException("The number of expected partitions was: "
|
||||
+ partitionCount + ", but " + partitions.size()
|
||||
+ (partitions.size() > 1 ? " have " : " has ") + "been found instead");
|
||||
int partitionSize = partitions.size();
|
||||
if (partitionSize < partitionCount) {
|
||||
if (tolerateLowerPartitionsOnBroker) {
|
||||
logger.warn("The number of expected partitions was: " + partitionCount + ", but "
|
||||
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
|
||||
+ "There will be " + (partitionCount - partitionSize) + " idle consumers");
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException("The number of expected partitions was: "
|
||||
+ partitionCount + ", but " + partitionSize
|
||||
+ (partitionSize > 1 ? " have " : " has ") + "been found instead");
|
||||
}
|
||||
}
|
||||
return partitions;
|
||||
}
|
||||
@@ -324,7 +341,5 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
", dlqName='" + dlqName + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -37,7 +37,8 @@ public final class KafkaTopicUtils {
|
||||
if (!((b >= 'a') && (b <= 'z') || (b >= 'A') && (b <= 'Z') || (b >= '0') && (b <= '9') || (b == '.')
|
||||
|| (b == '-') || (b == '_'))) {
|
||||
throw new IllegalArgumentException(
|
||||
"Topic name can only have ASCII alphanumerics, '.', '_' and '-'");
|
||||
"Topic name can only have ASCII alphanumerics, '.', '_' and '-', but was: '" + topicName
|
||||
+ "'");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.RELEASE</version>
|
||||
<version>2.0.0.M1</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[[spring-cloud-stream-binder-kafka-reference]]
|
||||
= Spring Cloud Stream Kafka Binder Reference Guide
|
||||
Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinathan, Gunnar Hillert, Mark Pollack, Patrick Peralta, Glenn Renfro, Thomas Risberg, Dave Syer, David Turanski, Janne Valkealahti, Benjamin Klein
|
||||
Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinathan, Gunnar Hillert, Mark Pollack, Patrick Peralta, Glenn Renfro, Thomas Risberg, Dave Syer, David Turanski, Janne Valkealahti, Benjamin Klein, Henryk Konsek
|
||||
:doctype: book
|
||||
:toc:
|
||||
:toclevels: 4
|
||||
@@ -24,6 +24,7 @@ Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinat
|
||||
= Reference Guide
|
||||
include::overview.adoc[]
|
||||
include::dlq.adoc[]
|
||||
include::metrics.adoc[]
|
||||
|
||||
= Appendices
|
||||
[appendix]
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
[[kafka-metrics]]
|
||||
== Kafka metrics
|
||||
|
||||
Kafka binder module exposes the following metrics:
|
||||
|
||||
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag` - this metric indicates how many messages
|
||||
have not been yet consumed from given binder's topic (`someTopic`) by given consumer group (`someGroup`).
|
||||
For example if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, then
|
||||
consumer group `myGroup` has `1000` messages to waiting to be consumed from topic `myTopic`. This metric is
|
||||
particularly useful to provide auto-scaling feedback to PaaS platform of your choice.
|
||||
@@ -190,6 +190,11 @@ batchTimeout::
|
||||
(Normally the producer does not wait at all, and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.
|
||||
+
|
||||
Default: `0`.
|
||||
messageKeyExpression::
|
||||
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message.
|
||||
For example `headers.key` or `payload.myKey`.
|
||||
+
|
||||
Default: `none`.
|
||||
configuration::
|
||||
Map with a key/value pair containing generic Kafka producer properties.
|
||||
+
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
|
||||
<description>Kafka related test classes</description>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -10,14 +10,14 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.2.0.RELEASE</version>
|
||||
<version>2.0.0.M1</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<version>1.2.0.RELEASE</version>
|
||||
<version>2.0.0.M1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
@@ -81,33 +81,8 @@
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-schema</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-avro-serializer</artifactId>
|
||||
<version>3.1.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-schema-registry</artifactId>
|
||||
<version>3.1.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>confluent</id>
|
||||
<url>http://packages.confluent.io/maven/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
|
||||
@@ -55,7 +55,6 @@ public class KafkaBinderEnvironmentPostProcessor implements EnvironmentPostProce
|
||||
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
|
||||
if (!environment.getPropertySources().contains(KAFKA_BINDER_DEFAULT_PROPERTIES)) {
|
||||
Map<String, Object> kafkaBinderDefaultProperties = new HashMap<>();
|
||||
kafkaBinderDefaultProperties.put("logging.pattern.console", "%d{ISO8601} %5p %t %c{2}:%L - %m%n");
|
||||
kafkaBinderDefaultProperties.put("logging.level.org.I0Itec.zkclient", "ERROR");
|
||||
kafkaBinderDefaultProperties.put("logging.level.kafka.server.KafkaConfig", "ERROR");
|
||||
kafkaBinderDefaultProperties.put("logging.level.kafka.admin.AdminClient.AdminConfig", "ERROR");
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -32,6 +32,7 @@ import org.springframework.kafka.core.ConsumerFactory;
|
||||
*
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Marius Bogoevici
|
||||
* @author Henryk Konsek
|
||||
*/
|
||||
public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
|
||||
@@ -53,8 +54,9 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
for (String topic : this.binder.getTopicsInUse().keySet()) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
if (this.binder.getTopicsInUse().get(topic).contains(partitionInfo) && partitionInfo.leader()
|
||||
.id() == -1) {
|
||||
if (this.binder.getTopicsInUse().get(topic).getPartitionInfos().contains(partitionInfo)
|
||||
&& partitionInfo.leader()
|
||||
.id() == -1) {
|
||||
downMessages.add(partitionInfo.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,126 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.boot.actuate.endpoint.PublicMetrics;
|
||||
import org.springframework.boot.actuate.metrics.Metric;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* Metrics for Kafka binder.
|
||||
*
|
||||
* @author Henryk Konsek
|
||||
*/
|
||||
public class KafkaBinderMetrics implements PublicMetrics {
|
||||
|
||||
private final static Logger LOG = LoggerFactory.getLogger(KafkaBinderMetrics.class);
|
||||
|
||||
static final String METRIC_PREFIX = "spring.cloud.stream.binder.kafka";
|
||||
|
||||
private final KafkaMessageChannelBinder binder;
|
||||
|
||||
private final KafkaBinderConfigurationProperties binderConfigurationProperties;
|
||||
|
||||
private ConsumerFactory<?, ?> defaultConsumerFactory;
|
||||
|
||||
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
|
||||
KafkaBinderConfigurationProperties binderConfigurationProperties,
|
||||
ConsumerFactory<?, ?> defaultConsumerFactory) {
|
||||
this.binder = binder;
|
||||
this.binderConfigurationProperties = binderConfigurationProperties;
|
||||
this.defaultConsumerFactory = defaultConsumerFactory;
|
||||
}
|
||||
|
||||
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
|
||||
KafkaBinderConfigurationProperties binderConfigurationProperties) {
|
||||
this(binder, binderConfigurationProperties, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<Metric<?>> metrics() {
|
||||
List<Metric<?>> metrics = new LinkedList<>();
|
||||
for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse()
|
||||
.entrySet()) {
|
||||
if (!topicInfo.getValue().isConsumerTopic()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String topic = topicInfo.getKey();
|
||||
String group = topicInfo.getValue().getConsumerGroup();
|
||||
|
||||
try (Consumer<?, ?> metadataConsumer = createConsumerFactory(group).createConsumer()) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
List<TopicPartition> topicPartitions = new LinkedList<>();
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
|
||||
}
|
||||
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
|
||||
long lag = 0;
|
||||
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
|
||||
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
|
||||
if (current != null) {
|
||||
lag += endOffset.getValue() - current.offset();
|
||||
}
|
||||
else {
|
||||
lag += endOffset.getValue();
|
||||
}
|
||||
}
|
||||
metrics.add(new Metric<>(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), lag));
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.debug("Cannot generate metric for topic: " + topic, e);
|
||||
}
|
||||
}
|
||||
return metrics;
|
||||
}
|
||||
|
||||
private ConsumerFactory<?, ?> createConsumerFactory(String group) {
|
||||
if (defaultConsumerFactory != null) {
|
||||
return defaultConsumerFactory;
|
||||
}
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConfiguration())) {
|
||||
props.putAll(binderConfigurationProperties.getConfiguration());
|
||||
}
|
||||
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
this.binderConfigurationProperties.getKafkaConnectionString());
|
||||
}
|
||||
props.put("group.id", group);
|
||||
return new DefaultKafkaConsumerFactory<>(props);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -83,10 +83,11 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
* @author Gary Russell
|
||||
* @author Mark Fisher
|
||||
* @author Soby Chacko
|
||||
* @author Henryk Konsek
|
||||
* @author Doug Saus
|
||||
*/
|
||||
public class KafkaMessageChannelBinder extends
|
||||
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
|
||||
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
|
||||
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
|
||||
|
||||
private final KafkaBinderConfigurationProperties configurationProperties;
|
||||
@@ -95,7 +96,7 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
|
||||
|
||||
private final Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<>();
|
||||
private final Map<String, TopicInformation> topicsInUse = new HashMap<>();
|
||||
|
||||
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
|
||||
KafkaTopicProvisioner provisioningProvider) {
|
||||
@@ -127,7 +128,7 @@ public class KafkaMessageChannelBinder extends
|
||||
this.producerListener = producerListener;
|
||||
}
|
||||
|
||||
Map<String, Collection<PartitionInfo>> getTopicsInUse() {
|
||||
Map<String, TopicInformation> getTopicsInUse() {
|
||||
return this.topicsInUse;
|
||||
}
|
||||
|
||||
@@ -145,14 +146,16 @@ public class KafkaMessageChannelBinder extends
|
||||
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
|
||||
final DefaultKafkaProducerFactory<byte[], byte[]> producerFB = getProducerFactory(producerProperties);
|
||||
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(producerProperties.getPartitionCount(),
|
||||
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
|
||||
producerProperties.getPartitionCount(),
|
||||
false,
|
||||
new Callable<Collection<PartitionInfo>>() {
|
||||
@Override
|
||||
public Collection<PartitionInfo> call() throws Exception {
|
||||
return producerFB.createProducer().partitionsFor(destination.getName());
|
||||
}
|
||||
});
|
||||
this.topicsInUse.put(destination.getName(), partitions);
|
||||
this.topicsInUse.put(destination.getName(), new TopicInformation(null, partitions));
|
||||
if (producerProperties.getPartitionCount() < partitions.size()) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("The `partitionCount` of the producer for topic " + destination.getName() + " is "
|
||||
@@ -165,7 +168,8 @@ public class KafkaMessageChannelBinder extends
|
||||
if (this.producerListener != null) {
|
||||
kafkaTemplate.setProducerListener(this.producerListener);
|
||||
}
|
||||
return new ProducerConfigurationMessageHandler(kafkaTemplate, destination.getName(), producerProperties, producerFB);
|
||||
return new ProducerConfigurationMessageHandler(kafkaTemplate, destination.getName(), producerProperties,
|
||||
producerFB);
|
||||
}
|
||||
|
||||
private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
|
||||
@@ -183,10 +187,12 @@ public class KafkaMessageChannelBinder extends
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
|
||||
}
|
||||
if (ObjectUtils.isEmpty(props.get(ProducerConfig.BATCH_SIZE_CONFIG))) {
|
||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
|
||||
props.put(ProducerConfig.BATCH_SIZE_CONFIG,
|
||||
String.valueOf(producerProperties.getExtension().getBufferSize()));
|
||||
}
|
||||
if (ObjectUtils.isEmpty(props.get(ProducerConfig.LINGER_MS_CONFIG))) {
|
||||
props.put(ProducerConfig.LINGER_MS_CONFIG, String.valueOf(producerProperties.getExtension().getBatchTimeout()));
|
||||
props.put(ProducerConfig.LINGER_MS_CONFIG,
|
||||
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
|
||||
}
|
||||
if (ObjectUtils.isEmpty(props.get(ProducerConfig.COMPRESSION_TYPE_CONFIG))) {
|
||||
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
|
||||
@@ -207,10 +213,13 @@ public class KafkaMessageChannelBinder extends
|
||||
Assert.isTrue(!anonymous || !extendedConsumerProperties.getExtension().isEnableDlq(),
|
||||
"DLQ support is not available for anonymous subscriptions");
|
||||
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
|
||||
final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup, extendedConsumerProperties);
|
||||
int partitionCount = extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency();
|
||||
final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup,
|
||||
extendedConsumerProperties);
|
||||
int partitionCount = extendedConsumerProperties.getInstanceCount()
|
||||
* extendedConsumerProperties.getConcurrency();
|
||||
|
||||
Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
|
||||
extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
|
||||
new Callable<Collection<PartitionInfo>>() {
|
||||
@Override
|
||||
public Collection<PartitionInfo> call() throws Exception {
|
||||
@@ -228,33 +237,37 @@ public class KafkaMessageChannelBinder extends
|
||||
listenedPartitions = new ArrayList<>();
|
||||
for (PartitionInfo partition : allPartitions) {
|
||||
// divide partitions across modules
|
||||
if ((partition.partition() % extendedConsumerProperties.getInstanceCount()) == extendedConsumerProperties.getInstanceIndex()) {
|
||||
if ((partition.partition()
|
||||
% extendedConsumerProperties.getInstanceCount()) == extendedConsumerProperties
|
||||
.getInstanceIndex()) {
|
||||
listenedPartitions.add(partition);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.topicsInUse.put(destination.getName(), listenedPartitions);
|
||||
this.topicsInUse.put(destination.getName(), new TopicInformation(group, listenedPartitions));
|
||||
|
||||
Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
|
||||
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
|
||||
listenedPartitions);
|
||||
final ContainerProperties containerProperties =
|
||||
anonymous || extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(destination.getName())
|
||||
final ContainerProperties containerProperties = anonymous
|
||||
|| extendedConsumerProperties.getExtension().isAutoRebalanceEnabled()
|
||||
? new ContainerProperties(destination.getName())
|
||||
: new ContainerProperties(topicPartitionInitialOffsets);
|
||||
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
|
||||
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
|
||||
new ConcurrentMessageListenerContainer(
|
||||
consumerFactory, containerProperties) {
|
||||
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer = new ConcurrentMessageListenerContainer(
|
||||
consumerFactory, containerProperties) {
|
||||
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
super.stop(callback);
|
||||
}
|
||||
};
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
super.stop(callback);
|
||||
}
|
||||
};
|
||||
messageListenerContainer.setConcurrency(concurrency);
|
||||
messageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
|
||||
messageListenerContainer.getContainerProperties()
|
||||
.setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
|
||||
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
|
||||
messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
|
||||
messageListenerContainer.getContainerProperties()
|
||||
.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
|
||||
}
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug(
|
||||
@@ -264,14 +277,14 @@ public class KafkaMessageChannelBinder extends
|
||||
this.logger.debug(
|
||||
"Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
|
||||
}
|
||||
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter =
|
||||
new KafkaMessageDrivenChannelAdapter<>(
|
||||
messageListenerContainer);
|
||||
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(
|
||||
messageListenerContainer);
|
||||
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
|
||||
final RetryTemplate retryTemplate = buildRetryTemplate(extendedConsumerProperties);
|
||||
kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);
|
||||
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
|
||||
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(new ExtendedProducerProperties<>(new KafkaProducerProperties()));
|
||||
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(
|
||||
new ExtendedProducerProperties<>(new KafkaProducerProperties()));
|
||||
final KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
|
||||
messageListenerContainer.getContainerProperties().setErrorHandler(new ErrorHandler() {
|
||||
|
||||
@@ -281,9 +294,11 @@ public class KafkaMessageChannelBinder extends
|
||||
: null;
|
||||
final byte[] payload = message.value() != null
|
||||
? Utils.toArray(ByteBuffer.wrap((byte[]) message.value())) : null;
|
||||
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName()) ?
|
||||
extendedConsumerProperties.getExtension().getDlqName() : "error." + destination.getName() + "." + group;
|
||||
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(dlqName, message.partition(), key, payload);
|
||||
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
|
||||
? extendedConsumerProperties.getExtension().getDlqName()
|
||||
: "error." + destination.getName() + "." + group;
|
||||
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(dlqName,
|
||||
message.partition(), key, payload);
|
||||
sentDlq.addCallback(new ListenableFutureCallback<SendResult<byte[], byte[]>>() {
|
||||
StringBuilder sb = new StringBuilder().append(" a message with key='")
|
||||
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'")
|
||||
@@ -318,9 +333,10 @@ public class KafkaMessageChannelBinder extends
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest");
|
||||
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest");
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
|
||||
|
||||
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
|
||||
props.putAll(configurationProperties.getConfiguration());
|
||||
}
|
||||
@@ -330,6 +346,11 @@ public class KafkaMessageChannelBinder extends
|
||||
if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getConfiguration())) {
|
||||
props.putAll(consumerProperties.getExtension().getConfiguration());
|
||||
}
|
||||
if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getStartOffset())) {
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
|
||||
consumerProperties.getExtension().getStartOffset().name());
|
||||
}
|
||||
|
||||
return new DefaultKafkaConsumerFactory<>(props);
|
||||
}
|
||||
|
||||
@@ -341,8 +362,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
private TopicPartitionInitialOffset[] getTopicPartitionInitialOffsets(
|
||||
Collection<PartitionInfo> listenedPartitions) {
|
||||
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets =
|
||||
new TopicPartitionInitialOffset[listenedPartitions.size()];
|
||||
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = new TopicPartitionInitialOffset[listenedPartitions
|
||||
.size()];
|
||||
int i = 0;
|
||||
for (PartitionInfo partition : listenedPartitions) {
|
||||
|
||||
@@ -371,6 +392,7 @@ public class KafkaMessageChannelBinder extends
|
||||
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
|
||||
super(kafkaTemplate);
|
||||
setTopicExpression(new LiteralExpression(topic));
|
||||
setMessageKeyExpression(producerProperties.getExtension().getMessageKeyExpression());
|
||||
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
|
||||
if (producerProperties.isPartitioned()) {
|
||||
SpelExpressionParser parser = new SpelExpressionParser();
|
||||
@@ -404,4 +426,30 @@ public class KafkaMessageChannelBinder extends
|
||||
return this.running;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TopicInformation {
|
||||
|
||||
private final String consumerGroup;
|
||||
|
||||
private final Collection<PartitionInfo> partitionInfos;
|
||||
|
||||
public TopicInformation(String consumerGroup, Collection<PartitionInfo> partitionInfos) {
|
||||
this.consumerGroup = consumerGroup;
|
||||
this.partitionInfos = partitionInfos;
|
||||
}
|
||||
|
||||
public String getConsumerGroup() {
|
||||
return consumerGroup;
|
||||
}
|
||||
|
||||
public boolean isConsumerTopic() {
|
||||
return consumerGroup != null;
|
||||
}
|
||||
|
||||
public Collection<PartitionInfo> getPartitionInfos() {
|
||||
return partitionInfos;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015-2016 the original author or authors.
|
||||
* Copyright 2015-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,12 +16,13 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.config;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
@@ -30,6 +31,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.utils.AppInfoParser;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.actuate.endpoint.PublicMetrics;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
|
||||
@@ -38,6 +40,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderJaasInitializerListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
|
||||
@@ -69,11 +72,13 @@ import org.springframework.util.ObjectUtils;
|
||||
* @author Soby Chacko
|
||||
* @author Mark Fisher
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Henryk Konsek
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(Binder.class)
|
||||
@Import({KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaBinderConfiguration.KafkaPropertiesConfiguration.class})
|
||||
@EnableConfigurationProperties({KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class})
|
||||
@Import({ KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class,
|
||||
KafkaBinderConfiguration.KafkaPropertiesConfiguration.class })
|
||||
@EnableConfigurationProperties({ KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class })
|
||||
public class KafkaBinderConfiguration {
|
||||
|
||||
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
|
||||
@@ -93,7 +98,7 @@ public class KafkaBinderConfiguration {
|
||||
@Autowired
|
||||
private ApplicationContext context;
|
||||
|
||||
@Autowired (required = false)
|
||||
@Autowired(required = false)
|
||||
private AdminUtilsOperation adminUtilsOperation;
|
||||
|
||||
@Bean
|
||||
@@ -125,11 +130,18 @@ public class KafkaBinderConfiguration {
|
||||
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
|
||||
props.putAll(configurationProperties.getConfiguration());
|
||||
}
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
|
||||
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
|
||||
}
|
||||
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
|
||||
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public PublicMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
|
||||
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties);
|
||||
}
|
||||
|
||||
@Bean(name = "adminUtilsOperation")
|
||||
@Conditional(Kafka09Present.class)
|
||||
@ConditionalOnClass(name = "kafka.admin.AdminUtils")
|
||||
@@ -158,7 +170,7 @@ public class KafkaBinderConfiguration {
|
||||
return AppInfoParser.getVersion().startsWith("0.10");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static class Kafka09Present implements Condition {
|
||||
|
||||
@Override
|
||||
@@ -193,24 +205,29 @@ public class KafkaBinderConfiguration {
|
||||
configuration.put(properties.getKey(), properties.getValue());
|
||||
}
|
||||
}
|
||||
for (Map.Entry<String, Object> producerProperties : this.kafkaProperties.buildProducerProperties().entrySet()) {
|
||||
for (Map.Entry<String, Object> producerProperties : this.kafkaProperties.buildProducerProperties()
|
||||
.entrySet()) {
|
||||
if (!configuration.containsKey(producerProperties.getKey())) {
|
||||
configuration.put(producerProperties.getKey(), producerProperties.getValue());
|
||||
}
|
||||
}
|
||||
for (Map.Entry<String, Object> consumerProperties : this.kafkaProperties.buildConsumerProperties().entrySet()) {
|
||||
for (Map.Entry<String, Object> consumerProperties : this.kafkaProperties.buildConsumerProperties()
|
||||
.entrySet()) {
|
||||
if (!configuration.containsKey(consumerProperties.getKey())) {
|
||||
configuration.put(consumerProperties.getKey(), consumerProperties.getValue());
|
||||
}
|
||||
}
|
||||
if (ObjectUtils.isEmpty(configuration.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
|
||||
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBinderConfigurationProperties.getKafkaConnectionString());
|
||||
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
kafkaBinderConfigurationProperties.getKafkaConnectionString());
|
||||
}
|
||||
else {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> bootStrapServers = (List<String>) configuration.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
|
||||
List<String> bootStrapServers = (List<String>) configuration
|
||||
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
|
||||
if (bootStrapServers.size() == 1 && bootStrapServers.get(0).equals("localhost:9092")) {
|
||||
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBinderConfigurationProperties.getKafkaConnectionString());
|
||||
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
kafkaBinderConfigurationProperties.getKafkaConnectionString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigura
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
@@ -48,23 +49,32 @@ import static org.junit.Assert.assertTrue;
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@SpringBootTest(classes = {KafkaBinderAutoConfigurationPropertiesTest.KafkaBinderConfigProperties.class, KafkaBinderConfiguration.class})
|
||||
@SpringBootTest(classes = { KafkaBinderAutoConfigurationPropertiesTest.KafkaBinderConfigProperties.class,
|
||||
KafkaBinderConfiguration.class })
|
||||
@TestPropertySource(locations = "classpath:binder-config-autoconfig.properties")
|
||||
public class KafkaBinderAutoConfigurationPropertiesTest {
|
||||
|
||||
@Autowired
|
||||
private KafkaMessageChannelBinder kafkaMessageChannelBinder;
|
||||
|
||||
@Autowired
|
||||
private KafkaBinderHealthIndicator kafkaBinderHealthIndicator;
|
||||
|
||||
@Test
|
||||
public void testKafkaBinderConfigurationWithKafkaProperties() throws Exception {
|
||||
assertNotNull(this.kafkaMessageChannelBinder);
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(new KafkaProducerProperties());
|
||||
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory", ExtendedProducerProperties.class);
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
|
||||
new KafkaProducerProperties());
|
||||
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
|
||||
ExtendedProducerProperties.class);
|
||||
getProducerFactoryMethod.setAccessible(true);
|
||||
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod.invoke(this.kafkaMessageChannelBinder, producerProperties);
|
||||
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs", Map.class);
|
||||
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
|
||||
.invoke(this.kafkaMessageChannelBinder, producerProperties);
|
||||
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
|
||||
Map.class);
|
||||
ReflectionUtils.makeAccessible(producerFactoryConfigField);
|
||||
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField, producerFactory);
|
||||
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
|
||||
producerFactory);
|
||||
assertTrue(producerConfigs.get("batch.size").equals(10));
|
||||
assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class));
|
||||
assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class));
|
||||
@@ -73,23 +83,49 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
|
||||
bootstrapServers.add("10.98.09.199:9092");
|
||||
bootstrapServers.add("10.98.09.196:9092");
|
||||
assertTrue((((List<String>) producerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
|
||||
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
|
||||
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod(
|
||||
"createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
|
||||
createKafkaConsumerFactoryMethod.setAccessible(true);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
|
||||
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
|
||||
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(
|
||||
new KafkaConsumerProperties());
|
||||
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod
|
||||
.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
|
||||
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs",
|
||||
Map.class);
|
||||
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
|
||||
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField, consumerFactory);
|
||||
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
|
||||
consumerFactory);
|
||||
assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class));
|
||||
assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class));
|
||||
assertTrue(consumerConfigs.get("group.id").equals("groupIdFromBootConfig"));
|
||||
assertTrue(consumerConfigs.get("auto.offset.reset").equals("earliest"));
|
||||
assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKafkaHealthIndicatorProperties() {
|
||||
assertNotNull(this.kafkaBinderHealthIndicator);
|
||||
Field consumerFactoryField = ReflectionUtils.findField(KafkaBinderHealthIndicator.class, "consumerFactory",
|
||||
ConsumerFactory.class);
|
||||
ReflectionUtils.makeAccessible(consumerFactoryField);
|
||||
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) ReflectionUtils.getField(
|
||||
consumerFactoryField, this.kafkaBinderHealthIndicator);
|
||||
Field configField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
|
||||
ReflectionUtils.makeAccessible(configField);
|
||||
Map<String, Object> configs = (Map<String, Object>) ReflectionUtils.getField(configField, consumerFactory);
|
||||
assertTrue(configs.containsKey("bootstrap.servers"));
|
||||
List<String> bootstrapServers = new ArrayList<>();
|
||||
bootstrapServers.add("10.98.09.199:9092");
|
||||
bootstrapServers.add("10.98.09.196:9092");
|
||||
assertTrue(((List<String>) configs.get("bootstrap.servers")).containsAll(bootstrapServers));
|
||||
}
|
||||
|
||||
public static class KafkaBinderConfigProperties {
|
||||
|
||||
@Bean
|
||||
KafkaProperties kafkaProperties() {
|
||||
return new KafkaProperties();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ import static org.junit.Assert.assertTrue;
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@SpringBootTest(classes = {KafkaBinderConfiguration.class})
|
||||
@SpringBootTest(classes = { KafkaBinderConfiguration.class, KafkaBinderConfigurationPropertiesTest.class })
|
||||
@TestPropertySource(locations = "classpath:binder-config.properties")
|
||||
public class KafkaBinderConfigurationPropertiesTest {
|
||||
|
||||
@@ -60,13 +60,18 @@ public class KafkaBinderConfigurationPropertiesTest {
|
||||
kafkaProducerProperties.setBufferSize(12345);
|
||||
kafkaProducerProperties.setBatchTimeout(100);
|
||||
kafkaProducerProperties.setCompressionType(KafkaProducerProperties.CompressionType.gzip);
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(kafkaProducerProperties);
|
||||
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory", ExtendedProducerProperties.class);
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
|
||||
kafkaProducerProperties);
|
||||
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
|
||||
ExtendedProducerProperties.class);
|
||||
getProducerFactoryMethod.setAccessible(true);
|
||||
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod.invoke(this.kafkaMessageChannelBinder, producerProperties);
|
||||
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs", Map.class);
|
||||
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
|
||||
.invoke(this.kafkaMessageChannelBinder, producerProperties);
|
||||
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
|
||||
Map.class);
|
||||
ReflectionUtils.makeAccessible(producerFactoryConfigField);
|
||||
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField, producerFactory);
|
||||
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
|
||||
producerFactory);
|
||||
assertTrue(producerConfigs.get("batch.size").equals("12345"));
|
||||
assertTrue(producerConfigs.get("linger.ms").equals("100"));
|
||||
assertTrue(producerConfigs.get("key.serializer").equals(ByteArraySerializer.class));
|
||||
@@ -75,15 +80,21 @@ public class KafkaBinderConfigurationPropertiesTest {
|
||||
List<String> bootstrapServers = new ArrayList<>();
|
||||
bootstrapServers.add("10.98.09.199:9082");
|
||||
assertTrue((((String) producerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
|
||||
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
|
||||
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod(
|
||||
"createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
|
||||
createKafkaConsumerFactoryMethod.setAccessible(true);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
|
||||
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
|
||||
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(
|
||||
new KafkaConsumerProperties());
|
||||
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod
|
||||
.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
|
||||
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs",
|
||||
Map.class);
|
||||
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
|
||||
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField, consumerFactory);
|
||||
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
|
||||
consumerFactory);
|
||||
assertTrue(consumerConfigs.get("key.deserializer").equals(ByteArrayDeserializer.class));
|
||||
assertTrue(consumerConfigs.get("value.deserializer").equals(ByteArrayDeserializer.class));
|
||||
assertTrue((((String) consumerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ import static org.junit.Assert.assertNotNull;
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@SpringBootTest(classes = KafkaBinderConfiguration.class)
|
||||
@SpringBootTest(classes = { KafkaBinderConfiguration.class, KafkaBinderConfigurationTest.class })
|
||||
public class KafkaBinderConfigurationTest {
|
||||
|
||||
@Autowired
|
||||
@@ -50,4 +50,5 @@ public class KafkaBinderConfigurationTest {
|
||||
producerListenerField, this.kafkaMessageChannelBinder);
|
||||
assertNotNull(producerListener);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -15,14 +15,11 @@
|
||||
*/
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
@@ -30,16 +27,21 @@ import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
import org.springframework.boot.actuate.health.Status;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
|
||||
/**
|
||||
* @author Barry Commins
|
||||
*/
|
||||
public class KafkaBinderHealthIndicatorTest {
|
||||
|
||||
private static final String TEST_TOPIC = "test";
|
||||
|
||||
private KafkaBinderHealthIndicator indicator;
|
||||
|
||||
@Mock
|
||||
@@ -51,7 +53,7 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
@Mock
|
||||
private KafkaMessageChannelBinder binder;
|
||||
|
||||
private Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<>();
|
||||
private Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = new HashMap<>();
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
@@ -64,7 +66,7 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
@Test
|
||||
public void kafkaBinderIsUp() {
|
||||
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, partitions);
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Health health = indicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||
@@ -73,7 +75,7 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
@Test
|
||||
public void kafkaBinderIsDown() {
|
||||
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, partitions);
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Health health = indicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
|
||||
@@ -0,0 +1,137 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import org.springframework.boot.actuate.metrics.Metric;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.TopicInformation;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyCollectionOf;
|
||||
import static org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.METRIC_PREFIX;
|
||||
|
||||
/**
|
||||
* @author Henryk Konsek
|
||||
*/
|
||||
public class KafkaBinderMetricsTest {
|
||||
|
||||
private static final String TEST_TOPIC = "test";
|
||||
|
||||
private KafkaBinderMetrics metrics;
|
||||
|
||||
@Mock
|
||||
private DefaultKafkaConsumerFactory consumerFactory;
|
||||
|
||||
@Mock
|
||||
private KafkaConsumer consumer;
|
||||
|
||||
@Mock
|
||||
private KafkaMessageChannelBinder binder;
|
||||
|
||||
private Map<String, TopicInformation> topicsInUse = new HashMap<>();
|
||||
|
||||
@Mock
|
||||
private KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
given(consumerFactory.createConsumer()).willReturn(consumer);
|
||||
given(binder.getTopicsInUse()).willReturn(topicsInUse);
|
||||
metrics = new KafkaBinderMetrics(binder, kafkaBinderConfigurationProperties, consumerFactory);
|
||||
given(consumer.endOffsets(anyCollectionOf(TopicPartition.class)))
|
||||
.willReturn(singletonMap(new TopicPartition(TEST_TOPIC, 0), 1000L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldIndicateLag() {
|
||||
given(consumer.committed(any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Collection<Metric<?>> collectedMetrics = metrics.metrics();
|
||||
assertThat(collectedMetrics).hasSize(1);
|
||||
assertThat(collectedMetrics.iterator().next().getName())
|
||||
.isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(500L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldSumUpPartitionsLags() {
|
||||
Map<TopicPartition, Long> endOffsets = new HashMap<>();
|
||||
endOffsets.put(new TopicPartition(TEST_TOPIC, 0), 1000L);
|
||||
endOffsets.put(new TopicPartition(TEST_TOPIC, 1), 1000L);
|
||||
given(consumer.endOffsets(anyCollectionOf(TopicPartition.class))).willReturn(endOffsets);
|
||||
given(consumer.committed(any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0), new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Collection<Metric<?>> collectedMetrics = metrics.metrics();
|
||||
assertThat(collectedMetrics).hasSize(1);
|
||||
assertThat(collectedMetrics.iterator().next().getName())
|
||||
.isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(1000L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldIndicateFullLagForNotCommittedGroups() {
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Collection<Metric<?>> collectedMetrics = metrics.metrics();
|
||||
assertThat(collectedMetrics).hasSize(1);
|
||||
assertThat(collectedMetrics.iterator().next().getName())
|
||||
.isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(1000L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotCalculateLagForProducerTopics() {
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation(null, partitions));
|
||||
Collection<Metric<?>> collectedMetrics = metrics.metrics();
|
||||
assertThat(collectedMetrics).isEmpty();
|
||||
}
|
||||
|
||||
private List<PartitionInfo> partitions(Node... nodes) {
|
||||
List<PartitionInfo> partitions = new ArrayList<>();
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
partitions.add(new PartitionInfo(TEST_TOPIC, i, nodes[i], null, null));
|
||||
}
|
||||
return partitions;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -32,9 +32,12 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.assertj.core.api.Condition;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
@@ -81,9 +84,13 @@ import static org.junit.Assert.assertTrue;
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Henryk Konsek
|
||||
*/
|
||||
public abstract class KafkaBinderTests extends PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>> {
|
||||
ExtendedProducerProperties<KafkaProducerProperties>> {
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedProvisioningException = ExpectedException.none();
|
||||
|
||||
@Override
|
||||
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
|
||||
@@ -113,10 +120,10 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
protected abstract ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties);
|
||||
|
||||
protected abstract void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
|
||||
int replicationFactor, Properties topicConfig);
|
||||
int replicationFactor, Properties topicConfig);
|
||||
|
||||
protected abstract int invokePartitionSize(String topic,
|
||||
ZkUtils zkUtils);
|
||||
ZkUtils zkUtils);
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -464,6 +471,35 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDynamicKeyExpression() throws Exception {
|
||||
Binder binder = getBinder(createConfigurationProperties());
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.getExtension().getConfiguration().put("key.serializer", StringSerializer.class.getName());
|
||||
producerProperties.getExtension().setMessageKeyExpression(spelExpressionParser.parseExpression("headers.key"));
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
String uniqueBindingId = UUID.randomUUID().toString();
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
createProducerBindingProperties(producerProperties));
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("foo" + uniqueBindingId + ".0",
|
||||
moduleOutputChannel, producerProperties);
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo" + uniqueBindingId + ".0", null,
|
||||
moduleInputChannel, consumerProperties);
|
||||
Thread.sleep(1000);
|
||||
Message<?> message = MessageBuilder.withPayload("somePayload").setHeader("key", "myDynamicKey").build();
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
String receivedKey = new String(inbound.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, byte[].class));
|
||||
assertThat(receivedKey).isEqualTo("myDynamicKey");
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCustomPartitionCountOverridesPartitioningIfLarger() throws Exception {
|
||||
@@ -1139,7 +1175,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoAddPartitionsDisabledFailsIfTopicUnderpartitioned() throws Exception {
|
||||
public void testAutoAddPartitionsDisabledSucceedsIfTopicUnderPartitionedAndAutoRebalanceEnabled() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
|
||||
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
@@ -1154,26 +1190,46 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
// binder.setApplicationContext(context);
|
||||
// binder.afterPropertiesSet();
|
||||
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
// this consumer must consume from partition 2
|
||||
consumerProperties.setInstanceCount(3);
|
||||
consumerProperties.setInstanceIndex(2);
|
||||
Binding binding = null;
|
||||
try {
|
||||
binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e).isInstanceOf(ProvisioningException.class);
|
||||
assertThat(e)
|
||||
.hasMessageContaining("The number of expected partitions was: 3, but 1 has been found instead");
|
||||
}
|
||||
finally {
|
||||
if (binding != null) {
|
||||
binding.unbind();
|
||||
}
|
||||
Binding binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
binding.unbind();
|
||||
assertThat(invokePartitionSize(testTopicName, zkUtils)).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoAddPartitionsDisabledFailsIfTopicUnderPartitionedAndAutoRebalanceDisabled() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
|
||||
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
invokeCreateTopic(zkUtils, testTopicName, 1, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(false);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
// this consumer must consume from partition 2
|
||||
consumerProperties.setInstanceCount(3);
|
||||
consumerProperties.setInstanceIndex(2);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
expectedProvisioningException.expect(ProvisioningException.class);
|
||||
expectedProvisioningException.expectMessage("The number of expected partitions was: 3, but 1 has been found instead");
|
||||
Binding binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
if (binding != null) {
|
||||
binding.unbind();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1310,11 +1366,11 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
}
|
||||
|
||||
private KafkaConsumer getKafkaConsumer(Binding binding) {
|
||||
DirectFieldAccessor bindingAccessor = new DirectFieldAccessor((DefaultBinding)binding);
|
||||
DirectFieldAccessor bindingAccessor = new DirectFieldAccessor((DefaultBinding) binding);
|
||||
KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) bindingAccessor.getPropertyValue("lifecycle");
|
||||
DirectFieldAccessor adapterAccessor = new DirectFieldAccessor(adapter);
|
||||
ConcurrentMessageListenerContainer messageListenerContainer = (ConcurrentMessageListenerContainer) adapterAccessor.getPropertyValue("messageListenerContainer");
|
||||
DirectFieldAccessor containerAccessor = new DirectFieldAccessor((ConcurrentMessageListenerContainer)messageListenerContainer);
|
||||
DirectFieldAccessor containerAccessor = new DirectFieldAccessor((ConcurrentMessageListenerContainer) messageListenerContainer);
|
||||
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) containerAccessor.getPropertyValue("consumerFactory");
|
||||
return (KafkaConsumer) consumerFactory.createConsumer();
|
||||
}
|
||||
@@ -1441,9 +1497,9 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
input2.setBeanName("test.input2J");
|
||||
Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.raw.0", "test", input2, consumerProperties);
|
||||
|
||||
output.send(new GenericMessage<>(new byte[] {(byte) 0}));
|
||||
output.send(new GenericMessage<>(new byte[] {(byte) 1}));
|
||||
output.send(new GenericMessage<>(new byte[] {(byte) 2}));
|
||||
output.send(new GenericMessage<>(new byte[]{(byte) 0}));
|
||||
output.send(new GenericMessage<>(new byte[]{(byte) 1}));
|
||||
output.send(new GenericMessage<>(new byte[]{(byte) 2}));
|
||||
|
||||
Message<?> receive0 = receive(input0);
|
||||
assertThat(receive0).isNotNull();
|
||||
@@ -1502,13 +1558,13 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
input2.setBeanName("test.input2S");
|
||||
Binding<MessageChannel> input2Binding = binder.bindConsumer("part.raw.0", "test", input2, consumerProperties);
|
||||
|
||||
Message<byte[]> message2 = org.springframework.integration.support.MessageBuilder.withPayload(new byte[] {2})
|
||||
Message<byte[]> message2 = org.springframework.integration.support.MessageBuilder.withPayload(new byte[]{2})
|
||||
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "kafkaBinderTestCommonsDelegate")
|
||||
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 42)
|
||||
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 43).build();
|
||||
output.send(message2);
|
||||
output.send(new GenericMessage<>(new byte[] {1}));
|
||||
output.send(new GenericMessage<>(new byte[] {0}));
|
||||
output.send(new GenericMessage<>(new byte[]{1}));
|
||||
output.send(new GenericMessage<>(new byte[]{0}));
|
||||
Message<?> receive0 = receive(input0);
|
||||
assertThat(receive0).isNotNull();
|
||||
Message<?> receive1 = receive(input1);
|
||||
|
||||
@@ -0,0 +1,82 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.integration.test.util.TestUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 1.2.2
|
||||
*
|
||||
*/
|
||||
public class KafkaBinderUnitTests {
|
||||
|
||||
@Test
|
||||
public void testPropertyOverrides() throws Exception {
|
||||
KafkaBinderConfigurationProperties binderConfigurationProperties = new KafkaBinderConfigurationProperties();
|
||||
AdminUtilsOperation adminUtilsOperation = mock(AdminUtilsOperation.class);
|
||||
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(binderConfigurationProperties,
|
||||
adminUtilsOperation);
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfigurationProperties,
|
||||
provisioningProvider);
|
||||
KafkaConsumerProperties consumerProps = new KafkaConsumerProperties();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> ecp =
|
||||
new ExtendedConsumerProperties<KafkaConsumerProperties>(consumerProps);
|
||||
Method method = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class,
|
||||
String.class, ExtendedConsumerProperties.class);
|
||||
method.setAccessible(true);
|
||||
|
||||
// test default for anon
|
||||
Object factory = method.invoke(binder, true, "foo", ecp);
|
||||
Map<?, ?> configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
|
||||
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest");
|
||||
|
||||
// test default for named
|
||||
factory = method.invoke(binder, false, "foo", ecp);
|
||||
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
|
||||
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
|
||||
|
||||
// binder level setting
|
||||
binderConfigurationProperties.setConfiguration(
|
||||
Collections.<String, Object>singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"));
|
||||
factory = method.invoke(binder, false, "foo", ecp);
|
||||
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
|
||||
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest");
|
||||
|
||||
// consumer level setting
|
||||
consumerProps.setConfiguration(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
|
||||
factory = method.invoke(binder, false, "foo", ecp);
|
||||
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
|
||||
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -5,3 +5,6 @@ spring.kafka.consumer.valueDeserializer=org.apache.kafka.common.serialization.Lo
|
||||
spring.kafka.producer.batchSize=10
|
||||
spring.kafka.bootstrapServers=10.98.09.199:9092,10.98.09.196:9092
|
||||
spring.kafka.producer.compressionType=snappy
|
||||
# Test consumer properties
|
||||
spring.kafka.consumer.auto-offset-reset=earliest
|
||||
spring.kafka.consumer.group-id=groupIdFromBootConfig
|
||||
|
||||
Reference in New Issue
Block a user