Compare commits

..

13 Commits

Author SHA1 Message Date
Marius Bogoevici
9471976ddd Release 2.0.0.M1 2017-06-23 17:03:39 -04:00
Marius Bogoevici
4c4873b3c1 Re-add Spring Kafka version 2017-06-23 13:17:59 -04:00
Marius Bogoevici
971bc96962 Use Spring Boot and dependencies provided by Spring Cloud Build 2017-06-23 13:12:46 -04:00
Marius Bogoevici
47a225d02a Merge branch 'master' into 2.0.x 2017-06-23 12:44:14 -04:00
Ilayaperumal Gopinathan
143b96f79d Allow consumer group.id to be overridden
- This change will allow consumer's group.id to be overridden from possible options (Spring Boot Kafka properties, binder configuration properties etc.,)

Resolves #149
2017-06-21 17:27:16 -04:00
Marius Bogoevici
a0f386f06f Log faulty topic name during validation
Fix #157
2017-06-18 11:21:01 -04:00
Henryk Konsek
9ad04882c8 Added Kafka binder lag metrics.
Fix #152

Metrics should be divided by group ID of the binding.

TopicInformation should carry optional group of the consumer.

Improved tests coverage.

Added lag metric documentation.
2017-06-07 23:55:53 -04:00
Gary Russell
705213efe8 Add Tests for Propery Override Hierarchy 2017-05-26 15:33:52 -04:00
Doug Saus
7355ada461 Fix ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
Move set of ConsumerConfig.AUTO_OFFSET_RESET_CONFIG based to before setting of custom kafka properties.  This allows users to override this behavior via spring.cloud.stream.kafka.binder.configuration

Updated fix to also allow the spring.cloud.stream.kafka.bindings.<channel>.consumer.startOffset value to override the anonymous-consumer-based value if set

Moved setting of auto.offset.reset based on binder configuration below setting of kafka properties so that it has higher preceence.

Trailing Spaces
2017-05-26 15:15:47 -04:00
Soby Chacko
d4aaf78089 Kafka binder test modules refactoring
Confluent repository needed for tests is moved to spring-cloud-stream-binder-kafka-test-support

Following dependencies are moved to spring-cloud-stream-binder-kafka-test-support

 *spring-cloud-stream-schema
 *kafka-avro-serializer
 *kafka-schema-registry

spring-cloud-stream-binder-kafka-test-support is brought into spring-cloud-stream-binder-kafka only in test scope

Fixes #121

Kafka binder test modules restructuring

New module for kafka - 0.10.1.1 integration tests
0.10.0.1 tests extend from 0.10.1.1 base class for tests
Removing the empty spring-cloud-stream-binder-kafka-test-support module
2017-05-26 15:07:59 +05:30
Marius Bogoevici
e440378e44 Update version to 2.0.0.BUILD-SNAPSHOT
- Update Spring Boot to version 2.0.0.BUILD-SNAPSHOT
- Set Kafka version to 0.10.2
- Remove tests for Kafka 0.9 and 0.10.0
2017-05-21 11:17:58 -04:00
Soby Chacko
53e38902c9 Update to next major release line: 1.3.0.BUILD-SNAPSHOT
Fix #139
2017-05-17 11:24:06 -04:00
Soby Chacko
a7a0a132ea Next update: 1.2.2.BUILD-SNAPSHOT 2017-05-16 16:09:33 -04:00
29 changed files with 668 additions and 595 deletions

23
pom.xml
View File

@@ -2,29 +2,27 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.1.RELEASE</version>
<version>2.0.0.M1</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.3.2.RELEASE</version>
<version>2.0.0.M1</version>
<relativePath />
</parent>
<properties>
<java.version>1.7</java.version>
<kafka.version>0.10.1.1</kafka.version>
<spring-kafka.version>1.1.2.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.1.0.RELEASE</spring-integration-kafka.version>
<spring-cloud-stream.version>1.2.2.RELEASE</spring-cloud-stream.version>
<kafka.version>0.10.2.0</kafka.version>
<spring-kafka.version>2.0.0.M2</spring-kafka.version>
<spring-integration-kafka.version>3.0.0.M1</spring-integration-kafka.version>
<spring-cloud-stream.version>2.0.0.M1</spring-cloud-stream.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>
<module>spring-cloud-starter-stream-kafka</module>
<module>spring-cloud-stream-binder-kafka-docs</module>
<module>spring-cloud-stream-binder-kafka-0.9-test</module>
<module>spring-cloud-stream-binder-kafka-0.10.0-test</module>
<module>spring-cloud-stream-binder-kafka-0.10.1-test</module>
<module>spring-cloud-stream-binder-kafka-core</module>
<module>spring-cloud-stream-binder-kafka-test-support</module>
</modules>
<dependencyManagement>
@@ -63,11 +61,6 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
@@ -138,7 +131,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build-tools</artifactId>
<version>1.3.2.RELEASE</version>
<version>2.0.0.M1</version>
</dependency>
</dependencies>
<executions>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.1.RELEASE</version>
<version>2.0.0.M1</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -1,108 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.0-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.0 Tests</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>http://www.spring.io</url>
</organization>
<properties>
<main.basedir>${basedir}/../..</main.basedir>
<!--
Override Kafka dependencies to Kafka 0.9.0.1 and supporting Spring Kafka and
Spring Integration Kafka versions
-->
<kafka.version>0.10.0.1</kafka.version>
<spring-kafka.version>1.1.2.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.1.0.RELEASE</spring-integration-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>${spring-cloud-stream.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>3.0.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>

View File

@@ -1,28 +0,0 @@
/*
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
* This test specifically tests for the 0.10.0.1 version of Kafka.
*
* @author Soby Chacko
*/
public class Kafka_0_10_0_BinderTests extends Kafka10BinderTests {
}

View File

@@ -0,0 +1,120 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.M1</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.1-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.1 Tests</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>http://www.spring.io</url>
</organization>
<properties>
<main.basedir>${basedir}/../..</main.basedir>
<kafka.version>0.10.1.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>1.1.6.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>${spring-cloud-stream.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -43,7 +43,7 @@ public class Kafka10TestBinder extends AbstractKafkaTestBinder {
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration, provisioningProvider);
binder.setCodec(getCodec());
binder.setCodec(AbstractKafkaTestBinder.getCodec());
ProducerListener producerListener = new LoggingProducerListener();
binder.setProducerListener(producerListener);
GenericApplicationContext context = new GenericApplicationContext();

View File

@@ -58,8 +58,6 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertTrue;
/**
@@ -70,7 +68,7 @@ import static org.junit.Assert.assertTrue;
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
*/
public class Kafka10BinderTests extends KafkaBinderTests {
public class Kafka_0_10_1_BinderTests extends KafkaBinderTests {
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
@@ -201,7 +199,7 @@ public class Kafka10BinderTests extends KafkaBinderTests {
break;
}
else if (System.currentTimeMillis() > endTime) {
fail("Kafka Schema Registry Server failed to start");
Assertions.fail("Kafka Schema Registry Server failed to start");
}
}
User1 firstOutboundFoo = new User1();
@@ -230,7 +228,7 @@ public class Kafka10BinderTests extends KafkaBinderTests {
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
assertThat(inbound).isNotNull();
Assertions.assertThat(inbound).isNotNull();
assertTrue(message.getPayload() instanceof User1);
User1 receivedUser = (User1) message.getPayload();
Assertions.assertThat(receivedUser.getName()).isEqualTo(userName1);

View File

@@ -1,78 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.9</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.9 Tests</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>http://www.spring.io</url>
</organization>
<properties>
<main.basedir>${basedir}/../..</main.basedir>
<!--
Override Kafka dependencies to Kafka 0.9.0.1 and supporting Spring Kafka and
Spring Integration Kafka versions
-->
<kafka.version>0.9.0.1</kafka.version>
<spring-kafka.version>1.0.5.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.0.1.RELEASE</spring-integration-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -1,60 +0,0 @@
/*
* Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
/**
* Test support class for {@link KafkaMessageChannelBinder}. Creates a binder that uses
* an embedded Kafka cluster.
* @author Eric Bottard
* @author Marius Bogoevici
* @author David Turanski
* @author Gary Russell
* @author Soby Chacko
*/
public class Kafka09TestBinder extends AbstractKafkaTestBinder {
public Kafka09TestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
try {
AdminUtilsOperation adminUtilsOperation = new Kafka09AdminUtilsOperation();
KafkaTopicProvisioner provisioningProvider =
new KafkaTopicProvisioner(binderConfiguration, adminUtilsOperation);
provisioningProvider.afterPropertiesSet();
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration, provisioningProvider);
binder.setCodec(getCodec());
ProducerListener producerListener = new LoggingProducerListener();
binder.setProducerListener(producerListener);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
this.setBinder(binder);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -1,158 +0,0 @@
/*
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.Before;
import org.junit.ClassRule;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
* @author Eric Bottard
* @author Marius Bogoevici
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
*/
public class Kafka_09_BinderTests extends KafkaBinderTests {
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
private Kafka09TestBinder binder;
private Kafka09AdminUtilsOperation adminUtilsOperation = new Kafka09AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
Thread.sleep(500);
}
@Override
protected Kafka09TestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binder = new Kafka09TestBinder(binderConfiguration);
}
return binder;
}
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
List<String> bAddresses = new ArrayList<>();
for (BrokerAddress bAddress : brokerAddresses) {
bAddresses.add(bAddress.toString());
}
String[] foo = new String[bAddresses.size()];
binderConfiguration.setBrokers(bAddresses.toArray(foo));
binderConfiguration.setZkNodes(embeddedKafka.getZookeeperConnectionString());
return binderConfiguration;
}
@Override
protected int partitionSize(String topic) {
return consumerFactory().createConsumer().partitionsFor(topic).size();
}
@Override
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
kafkaBinderConfigurationProperties.getZkSessionTimeout(), kafkaBinderConfigurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
return new ZkUtils(zkClient, null, false);
}
@Override
protected void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Properties topicConfig) {
adminUtilsOperation.invokeCreateTopic(zkUtils, topic, partitions, replicationFactor, new Properties());
}
@Override
protected int invokePartitionSize(String topic, ZkUtils zkUtils) {
return adminUtilsOperation.partitionSize(topic, zkUtils);
}
@Override
public String getKafkaOffsetHeaderKey() {
return KafkaHeaders.OFFSET;
}
@Override
protected Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
return new Kafka09TestBinder(kafkaBinderConfigurationProperties);
}
@Before
public void init() {
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
if (multiplier != null) {
timeoutMultiplier = Double.parseDouble(multiplier);
}
}
@Override
protected boolean usesExplicitRouting() {
return false;
}
@Override
protected String getClassUnderTestName() {
return CLASS_UNDER_TEST_NAME;
}
@Override
public Spy spyOn(final String name) {
throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
}
private ConsumerFactory<byte[], byte[]> consumerFactory() {
Map<String, Object> props = new HashMap<>();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
}
}

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.1.RELEASE</version>
<version>2.0.0.M1</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -37,7 +37,8 @@ public final class KafkaTopicUtils {
if (!((b >= 'a') && (b <= 'z') || (b >= 'A') && (b <= 'Z') || (b >= '0') && (b <= '9') || (b == '.')
|| (b == '-') || (b == '_'))) {
throw new IllegalArgumentException(
"Topic name can only have ASCII alphanumerics, '.', '_' and '-'");
"Topic name can only have ASCII alphanumerics, '.', '_' and '-', but was: '" + topicName
+ "'");
}
}
}

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.1.RELEASE</version>
<version>2.0.0.M1</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>

View File

@@ -1,6 +1,6 @@
[[spring-cloud-stream-binder-kafka-reference]]
= Spring Cloud Stream Kafka Binder Reference Guide
Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinathan, Gunnar Hillert, Mark Pollack, Patrick Peralta, Glenn Renfro, Thomas Risberg, Dave Syer, David Turanski, Janne Valkealahti, Benjamin Klein
Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinathan, Gunnar Hillert, Mark Pollack, Patrick Peralta, Glenn Renfro, Thomas Risberg, Dave Syer, David Turanski, Janne Valkealahti, Benjamin Klein, Henryk Konsek
:doctype: book
:toc:
:toclevels: 4
@@ -24,6 +24,7 @@ Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinat
= Reference Guide
include::overview.adoc[]
include::dlq.adoc[]
include::metrics.adoc[]
= Appendices
[appendix]

View File

@@ -0,0 +1,10 @@
[[kafka-metrics]]
== Kafka metrics
Kafka binder module exposes the following metrics:
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag` - this metric indicates how many messages
have not been yet consumed from given binder's topic (`someTopic`) by given consumer group (`someGroup`).
For example if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, then
consumer group `myGroup` has `1000` messages to waiting to be consumed from topic `myTopic`. This metric is
particularly useful to provide auto-scaling feedback to PaaS platform of your choice.

View File

@@ -1,25 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<description>Kafka related test classes</description>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@@ -10,14 +10,14 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.1.RELEASE</version>
<version>2.0.0.M1</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>1.2.1.RELEASE</version>
<version>2.0.0.M1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -81,33 +81,8 @@
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>${spring-cloud-stream.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@ import org.springframework.kafka.core.ConsumerFactory;
*
* @author Ilayaperumal Gopinathan
* @author Marius Bogoevici
* @author Henryk Konsek
*/
public class KafkaBinderHealthIndicator implements HealthIndicator {
@@ -53,8 +54,9 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
for (String topic : this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (this.binder.getTopicsInUse().get(topic).contains(partitionInfo) && partitionInfo.leader()
.id() == -1) {
if (this.binder.getTopicsInUse().get(topic).getPartitionInfos().contains(partitionInfo)
&& partitionInfo.leader()
.id() == -1) {
downMessages.add(partitionInfo.toString());
}
}

View File

@@ -0,0 +1,126 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.endpoint.PublicMetrics;
import org.springframework.boot.actuate.metrics.Metric;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.ObjectUtils;
/**
* Metrics for Kafka binder.
*
* @author Henryk Konsek
*/
public class KafkaBinderMetrics implements PublicMetrics {
private final static Logger LOG = LoggerFactory.getLogger(KafkaBinderMetrics.class);
static final String METRIC_PREFIX = "spring.cloud.stream.binder.kafka";
private final KafkaMessageChannelBinder binder;
private final KafkaBinderConfigurationProperties binderConfigurationProperties;
private ConsumerFactory<?, ?> defaultConsumerFactory;
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties binderConfigurationProperties,
ConsumerFactory<?, ?> defaultConsumerFactory) {
this.binder = binder;
this.binderConfigurationProperties = binderConfigurationProperties;
this.defaultConsumerFactory = defaultConsumerFactory;
}
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties binderConfigurationProperties) {
this(binder, binderConfigurationProperties, null);
}
@Override
public Collection<Metric<?>> metrics() {
List<Metric<?>> metrics = new LinkedList<>();
for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse()
.entrySet()) {
if (!topicInfo.getValue().isConsumerTopic()) {
continue;
}
String topic = topicInfo.getKey();
String group = topicInfo.getValue().getConsumerGroup();
try (Consumer<?, ?> metadataConsumer = createConsumerFactory(group).createConsumer()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new LinkedList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
long lag = 0;
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
if (current != null) {
lag += endOffset.getValue() - current.offset();
}
else {
lag += endOffset.getValue();
}
}
metrics.add(new Metric<>(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), lag));
}
catch (Exception e) {
LOG.debug("Cannot generate metric for topic: " + topic, e);
}
}
return metrics;
}
private ConsumerFactory<?, ?> createConsumerFactory(String group) {
if (defaultConsumerFactory != null) {
return defaultConsumerFactory;
}
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConfiguration())) {
props.putAll(binderConfigurationProperties.getConfiguration());
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.binderConfigurationProperties.getKafkaConnectionString());
}
props.put("group.id", group);
return new DefaultKafkaConsumerFactory<>(props);
}
}

View File

@@ -84,10 +84,10 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
* @author Mark Fisher
* @author Soby Chacko
* @author Henryk Konsek
* @author Doug Saus
*/
public class KafkaMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
private final KafkaBinderConfigurationProperties configurationProperties;
@@ -96,7 +96,7 @@ public class KafkaMessageChannelBinder extends
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
private final Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<>();
private final Map<String, TopicInformation> topicsInUse = new HashMap<>();
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
@@ -128,7 +128,7 @@ public class KafkaMessageChannelBinder extends
this.producerListener = producerListener;
}
Map<String, Collection<PartitionInfo>> getTopicsInUse() {
Map<String, TopicInformation> getTopicsInUse() {
return this.topicsInUse;
}
@@ -146,7 +146,8 @@ public class KafkaMessageChannelBinder extends
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
final DefaultKafkaProducerFactory<byte[], byte[]> producerFB = getProducerFactory(producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(producerProperties.getPartitionCount(),
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
producerProperties.getPartitionCount(),
false,
new Callable<Collection<PartitionInfo>>() {
@Override
@@ -154,7 +155,7 @@ public class KafkaMessageChannelBinder extends
return producerFB.createProducer().partitionsFor(destination.getName());
}
});
this.topicsInUse.put(destination.getName(), partitions);
this.topicsInUse.put(destination.getName(), new TopicInformation(null, partitions));
if (producerProperties.getPartitionCount() < partitions.size()) {
if (this.logger.isInfoEnabled()) {
this.logger.info("The `partitionCount` of the producer for topic " + destination.getName() + " is "
@@ -167,7 +168,8 @@ public class KafkaMessageChannelBinder extends
if (this.producerListener != null) {
kafkaTemplate.setProducerListener(this.producerListener);
}
return new ProducerConfigurationMessageHandler(kafkaTemplate, destination.getName(), producerProperties, producerFB);
return new ProducerConfigurationMessageHandler(kafkaTemplate, destination.getName(), producerProperties,
producerFB);
}
private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
@@ -185,10 +187,12 @@ public class KafkaMessageChannelBinder extends
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
}
if (ObjectUtils.isEmpty(props.get(ProducerConfig.BATCH_SIZE_CONFIG))) {
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
props.put(ProducerConfig.BATCH_SIZE_CONFIG,
String.valueOf(producerProperties.getExtension().getBufferSize()));
}
if (ObjectUtils.isEmpty(props.get(ProducerConfig.LINGER_MS_CONFIG))) {
props.put(ProducerConfig.LINGER_MS_CONFIG, String.valueOf(producerProperties.getExtension().getBatchTimeout()));
props.put(ProducerConfig.LINGER_MS_CONFIG,
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
}
if (ObjectUtils.isEmpty(props.get(ProducerConfig.COMPRESSION_TYPE_CONFIG))) {
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
@@ -209,8 +213,10 @@ public class KafkaMessageChannelBinder extends
Assert.isTrue(!anonymous || !extendedConsumerProperties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup, extendedConsumerProperties);
int partitionCount = extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency();
final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup,
extendedConsumerProperties);
int partitionCount = extendedConsumerProperties.getInstanceCount()
* extendedConsumerProperties.getConcurrency();
Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
@@ -231,33 +237,37 @@ public class KafkaMessageChannelBinder extends
listenedPartitions = new ArrayList<>();
for (PartitionInfo partition : allPartitions) {
// divide partitions across modules
if ((partition.partition() % extendedConsumerProperties.getInstanceCount()) == extendedConsumerProperties.getInstanceIndex()) {
if ((partition.partition()
% extendedConsumerProperties.getInstanceCount()) == extendedConsumerProperties
.getInstanceIndex()) {
listenedPartitions.add(partition);
}
}
}
this.topicsInUse.put(destination.getName(), listenedPartitions);
this.topicsInUse.put(destination.getName(), new TopicInformation(group, listenedPartitions));
Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
listenedPartitions);
final ContainerProperties containerProperties =
anonymous || extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ?
new ContainerProperties(destination.getName()) : new ContainerProperties(topicPartitionInitialOffsets);
final ContainerProperties containerProperties = anonymous
|| extendedConsumerProperties.getExtension().isAutoRebalanceEnabled()
? new ContainerProperties(destination.getName())
: new ContainerProperties(topicPartitionInitialOffsets);
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
new ConcurrentMessageListenerContainer(
consumerFactory, containerProperties) {
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer = new ConcurrentMessageListenerContainer(
consumerFactory, containerProperties) {
@Override
public void stop(Runnable callback) {
super.stop(callback);
}
};
@Override
public void stop(Runnable callback) {
super.stop(callback);
}
};
messageListenerContainer.setConcurrency(concurrency);
messageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
messageListenerContainer.getContainerProperties()
.setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
messageListenerContainer.getContainerProperties()
.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(
@@ -267,14 +277,14 @@ public class KafkaMessageChannelBinder extends
this.logger.debug(
"Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
}
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(
messageListenerContainer);
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(
messageListenerContainer);
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
final RetryTemplate retryTemplate = buildRetryTemplate(extendedConsumerProperties);
kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(new ExtendedProducerProperties<>(new KafkaProducerProperties()));
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(
new ExtendedProducerProperties<>(new KafkaProducerProperties()));
final KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
messageListenerContainer.getContainerProperties().setErrorHandler(new ErrorHandler() {
@@ -284,9 +294,11 @@ public class KafkaMessageChannelBinder extends
: null;
final byte[] payload = message.value() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) message.value())) : null;
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName()) ?
extendedConsumerProperties.getExtension().getDlqName() : "error." + destination.getName() + "." + group;
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(dlqName, message.partition(), key, payload);
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
? extendedConsumerProperties.getExtension().getDlqName()
: "error." + destination.getName() + "." + group;
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(dlqName,
message.partition(), key, payload);
sentDlq.addCallback(new ListenableFutureCallback<SendResult<byte[], byte[]>>() {
StringBuilder sb = new StringBuilder().append(" a message with key='")
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'")
@@ -322,6 +334,9 @@ public class KafkaMessageChannelBinder extends
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
@@ -331,8 +346,11 @@ public class KafkaMessageChannelBinder extends
if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getConfiguration())) {
props.putAll(consumerProperties.getExtension().getConfiguration());
}
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest");
if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getStartOffset())) {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
consumerProperties.getExtension().getStartOffset().name());
}
return new DefaultKafkaConsumerFactory<>(props);
}
@@ -344,8 +362,8 @@ public class KafkaMessageChannelBinder extends
private TopicPartitionInitialOffset[] getTopicPartitionInitialOffsets(
Collection<PartitionInfo> listenedPartitions) {
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets =
new TopicPartitionInitialOffset[listenedPartitions.size()];
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = new TopicPartitionInitialOffset[listenedPartitions
.size()];
int i = 0;
for (PartitionInfo partition : listenedPartitions) {
@@ -408,4 +426,30 @@ public class KafkaMessageChannelBinder extends
return this.running;
}
}
public static class TopicInformation {
private final String consumerGroup;
private final Collection<PartitionInfo> partitionInfos;
public TopicInformation(String consumerGroup, Collection<PartitionInfo> partitionInfos) {
this.consumerGroup = consumerGroup;
this.partitionInfos = partitionInfos;
}
public String getConsumerGroup() {
return consumerGroup;
}
public boolean isConsumerTopic() {
return consumerGroup != null;
}
public Collection<PartitionInfo> getPartitionInfos() {
return partitionInfos;
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,12 +16,13 @@
package org.springframework.cloud.stream.binder.kafka.config;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -30,6 +31,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.endpoint.PublicMetrics;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
@@ -38,6 +40,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderJaasInitializerListener;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
@@ -69,11 +72,13 @@ import org.springframework.util.ObjectUtils;
* @author Soby Chacko
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
* @author Henryk Konsek
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaBinderConfiguration.KafkaPropertiesConfiguration.class})
@EnableConfigurationProperties({KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class})
@Import({ KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class,
KafkaBinderConfiguration.KafkaPropertiesConfiguration.class })
@EnableConfigurationProperties({ KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
@@ -93,7 +98,7 @@ public class KafkaBinderConfiguration {
@Autowired
private ApplicationContext context;
@Autowired (required = false)
@Autowired(required = false)
private AdminUtilsOperation adminUtilsOperation;
@Bean
@@ -132,6 +137,11 @@ public class KafkaBinderConfiguration {
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
}
@Bean
public PublicMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties);
}
@Bean(name = "adminUtilsOperation")
@Conditional(Kafka09Present.class)
@ConditionalOnClass(name = "kafka.admin.AdminUtils")
@@ -160,7 +170,7 @@ public class KafkaBinderConfiguration {
return AppInfoParser.getVersion().startsWith("0.10");
}
}
static class Kafka09Present implements Condition {
@Override
@@ -195,24 +205,29 @@ public class KafkaBinderConfiguration {
configuration.put(properties.getKey(), properties.getValue());
}
}
for (Map.Entry<String, Object> producerProperties : this.kafkaProperties.buildProducerProperties().entrySet()) {
for (Map.Entry<String, Object> producerProperties : this.kafkaProperties.buildProducerProperties()
.entrySet()) {
if (!configuration.containsKey(producerProperties.getKey())) {
configuration.put(producerProperties.getKey(), producerProperties.getValue());
}
}
for (Map.Entry<String, Object> consumerProperties : this.kafkaProperties.buildConsumerProperties().entrySet()) {
for (Map.Entry<String, Object> consumerProperties : this.kafkaProperties.buildConsumerProperties()
.entrySet()) {
if (!configuration.containsKey(consumerProperties.getKey())) {
configuration.put(consumerProperties.getKey(), consumerProperties.getValue());
}
}
if (ObjectUtils.isEmpty(configuration.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBinderConfigurationProperties.getKafkaConnectionString());
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaBinderConfigurationProperties.getKafkaConnectionString());
}
else {
@SuppressWarnings("unchecked")
List<String> bootStrapServers = (List<String>) configuration.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
List<String> bootStrapServers = (List<String>) configuration
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
if (bootStrapServers.size() == 1 && bootStrapServers.get(0).equals("localhost:9092")) {
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBinderConfigurationProperties.getKafkaConnectionString());
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaBinderConfigurationProperties.getKafkaConnectionString());
}
}
}

View File

@@ -49,7 +49,8 @@ import static org.junit.Assert.assertTrue;
* @author Ilayaperumal Gopinathan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {KafkaBinderAutoConfigurationPropertiesTest.KafkaBinderConfigProperties.class, KafkaBinderConfiguration.class})
@SpringBootTest(classes = { KafkaBinderAutoConfigurationPropertiesTest.KafkaBinderConfigProperties.class,
KafkaBinderConfiguration.class })
@TestPropertySource(locations = "classpath:binder-config-autoconfig.properties")
public class KafkaBinderAutoConfigurationPropertiesTest {
@@ -62,13 +63,18 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
@Test
public void testKafkaBinderConfigurationWithKafkaProperties() throws Exception {
assertNotNull(this.kafkaMessageChannelBinder);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(new KafkaProducerProperties());
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory", ExtendedProducerProperties.class);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod.invoke(this.kafkaMessageChannelBinder, producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs", Map.class);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField, producerFactory);
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
producerFactory);
assertTrue(producerConfigs.get("batch.size").equals(10));
assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class));
assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class));
@@ -77,17 +83,22 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
bootstrapServers.add("10.98.09.199:9092");
bootstrapServers.add("10.98.09.196:9092");
assertTrue((((List<String>) producerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod(
"createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
createKafkaConsumerFactoryMethod.setAccessible(true);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(
new KafkaConsumerProperties());
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs",
Map.class);
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField, consumerFactory);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
consumerFactory);
assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class));
assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class));
assertTrue(consumerConfigs.get("group.id").equals("test"));
assertTrue(consumerConfigs.get("auto.offset.reset").equals("latest"));
assertTrue(consumerConfigs.get("group.id").equals("groupIdFromBootConfig"));
assertTrue(consumerConfigs.get("auto.offset.reset").equals("earliest"));
assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
}
@@ -106,7 +117,7 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9092");
bootstrapServers.add("10.98.09.196:9092");
assertTrue(((List<String>)configs.get("bootstrap.servers")).containsAll(bootstrapServers));
assertTrue(((List<String>) configs.get("bootstrap.servers")).containsAll(bootstrapServers));
}
public static class KafkaBinderConfigProperties {
@@ -115,5 +126,6 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
KafkaProperties kafkaProperties() {
return new KafkaProperties();
}
}
}

View File

@@ -46,7 +46,7 @@ import static org.junit.Assert.assertTrue;
* @author Ilayaperumal Gopinathan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {KafkaBinderConfiguration.class})
@SpringBootTest(classes = { KafkaBinderConfiguration.class, KafkaBinderConfigurationPropertiesTest.class })
@TestPropertySource(locations = "classpath:binder-config.properties")
public class KafkaBinderConfigurationPropertiesTest {
@@ -60,13 +60,18 @@ public class KafkaBinderConfigurationPropertiesTest {
kafkaProducerProperties.setBufferSize(12345);
kafkaProducerProperties.setBatchTimeout(100);
kafkaProducerProperties.setCompressionType(KafkaProducerProperties.CompressionType.gzip);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(kafkaProducerProperties);
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory", ExtendedProducerProperties.class);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
kafkaProducerProperties);
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod.invoke(this.kafkaMessageChannelBinder, producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs", Map.class);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField, producerFactory);
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
producerFactory);
assertTrue(producerConfigs.get("batch.size").equals("12345"));
assertTrue(producerConfigs.get("linger.ms").equals("100"));
assertTrue(producerConfigs.get("key.serializer").equals(ByteArraySerializer.class));
@@ -75,15 +80,21 @@ public class KafkaBinderConfigurationPropertiesTest {
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9082");
assertTrue((((String) producerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod(
"createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
createKafkaConsumerFactoryMethod.setAccessible(true);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(
new KafkaConsumerProperties());
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs",
Map.class);
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField, consumerFactory);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
consumerFactory);
assertTrue(consumerConfigs.get("key.deserializer").equals(ByteArrayDeserializer.class));
assertTrue(consumerConfigs.get("value.deserializer").equals(ByteArrayDeserializer.class));
assertTrue((((String) consumerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
}
}

View File

@@ -33,7 +33,7 @@ import static org.junit.Assert.assertNotNull;
* @author Ilayaperumal Gopinathan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = KafkaBinderConfiguration.class)
@SpringBootTest(classes = { KafkaBinderConfiguration.class, KafkaBinderConfigurationTest.class })
public class KafkaBinderConfigurationTest {
@Autowired
@@ -50,4 +50,5 @@ public class KafkaBinderConfigurationTest {
producerListenerField, this.kafkaMessageChannelBinder);
assertNotNull(producerListener);
}
}

View File

@@ -15,14 +15,11 @@
*/
package org.springframework.cloud.stream.binder.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -30,16 +27,21 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
/**
* @author Barry Commins
*/
public class KafkaBinderHealthIndicatorTest {
private static final String TEST_TOPIC = "test";
private KafkaBinderHealthIndicator indicator;
@Mock
@@ -51,7 +53,7 @@ public class KafkaBinderHealthIndicatorTest {
@Mock
private KafkaMessageChannelBinder binder;
private Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<>();
private Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = new HashMap<>();
@Before
public void setup() {
@@ -64,7 +66,7 @@ public class KafkaBinderHealthIndicatorTest {
@Test
public void kafkaBinderIsUp() {
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, partitions);
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
@@ -73,7 +75,7 @@ public class KafkaBinderHealthIndicatorTest {
@Test
public void kafkaBinderIsDown() {
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
topicsInUse.put(TEST_TOPIC, partitions);
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);

View File

@@ -0,0 +1,137 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.boot.actuate.metrics.Metric;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.TopicInformation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyCollectionOf;
import static org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.METRIC_PREFIX;
/**
* @author Henryk Konsek
*/
public class KafkaBinderMetricsTest {
private static final String TEST_TOPIC = "test";
private KafkaBinderMetrics metrics;
@Mock
private DefaultKafkaConsumerFactory consumerFactory;
@Mock
private KafkaConsumer consumer;
@Mock
private KafkaMessageChannelBinder binder;
private Map<String, TopicInformation> topicsInUse = new HashMap<>();
@Mock
private KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
given(consumerFactory.createConsumer()).willReturn(consumer);
given(binder.getTopicsInUse()).willReturn(topicsInUse);
metrics = new KafkaBinderMetrics(binder, kafkaBinderConfigurationProperties, consumerFactory);
given(consumer.endOffsets(anyCollectionOf(TopicPartition.class)))
.willReturn(singletonMap(new TopicPartition(TEST_TOPIC, 0), 1000L));
}
@Test
public void shouldIndicateLag() {
given(consumer.committed(any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Collection<Metric<?>> collectedMetrics = metrics.metrics();
assertThat(collectedMetrics).hasSize(1);
assertThat(collectedMetrics.iterator().next().getName())
.isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(500L);
}
@Test
public void shouldSumUpPartitionsLags() {
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(new TopicPartition(TEST_TOPIC, 0), 1000L);
endOffsets.put(new TopicPartition(TEST_TOPIC, 1), 1000L);
given(consumer.endOffsets(anyCollectionOf(TopicPartition.class))).willReturn(endOffsets);
given(consumer.committed(any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
List<PartitionInfo> partitions = partitions(new Node(0, null, 0), new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Collection<Metric<?>> collectedMetrics = metrics.metrics();
assertThat(collectedMetrics).hasSize(1);
assertThat(collectedMetrics.iterator().next().getName())
.isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(1000L);
}
@Test
public void shouldIndicateFullLagForNotCommittedGroups() {
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Collection<Metric<?>> collectedMetrics = metrics.metrics();
assertThat(collectedMetrics).hasSize(1);
assertThat(collectedMetrics.iterator().next().getName())
.isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(1000L);
}
@Test
public void shouldNotCalculateLagForProducerTopics() {
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation(null, partitions));
Collection<Metric<?>> collectedMetrics = metrics.metrics();
assertThat(collectedMetrics).isEmpty();
}
private List<PartitionInfo> partitions(Node... nodes) {
List<PartitionInfo> partitions = new ArrayList<>();
for (int i = 0; i < nodes.length; i++) {
partitions.add(new PartitionInfo(TEST_TOPIC, i, nodes[i], null, null));
}
return partitions;
}
}

View File

@@ -0,0 +1,82 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.Test;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.integration.test.util.TestUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
/**
* @author Gary Russell
* @since 1.2.2
*
*/
public class KafkaBinderUnitTests {
@Test
public void testPropertyOverrides() throws Exception {
KafkaBinderConfigurationProperties binderConfigurationProperties = new KafkaBinderConfigurationProperties();
AdminUtilsOperation adminUtilsOperation = mock(AdminUtilsOperation.class);
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(binderConfigurationProperties,
adminUtilsOperation);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfigurationProperties,
provisioningProvider);
KafkaConsumerProperties consumerProps = new KafkaConsumerProperties();
ExtendedConsumerProperties<KafkaConsumerProperties> ecp =
new ExtendedConsumerProperties<KafkaConsumerProperties>(consumerProps);
Method method = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class,
String.class, ExtendedConsumerProperties.class);
method.setAccessible(true);
// test default for anon
Object factory = method.invoke(binder, true, "foo", ecp);
Map<?, ?> configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest");
// test default for named
factory = method.invoke(binder, false, "foo", ecp);
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
// binder level setting
binderConfigurationProperties.setConfiguration(
Collections.<String, Object>singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"));
factory = method.invoke(binder, false, "foo", ecp);
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest");
// consumer level setting
consumerProps.setConfiguration(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
factory = method.invoke(binder, false, "foo", ecp);
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
}
}

View File

@@ -7,4 +7,4 @@ spring.kafka.bootstrapServers=10.98.09.199:9092,10.98.09.196:9092
spring.kafka.producer.compressionType=snappy
# Test consumer properties
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=testEmbeddedKafkaApplication
spring.kafka.consumer.group-id=groupIdFromBootConfig