Compare commits
14 Commits
v2.0.0.M2
...
v1.3.0.REL
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5269a6642c | ||
|
|
a6201425e0 | ||
|
|
715b69506d | ||
|
|
4fb37e2eac | ||
|
|
2a6d716ecf | ||
|
|
7344bed048 | ||
|
|
1e74d624c8 | ||
|
|
7a2b9f1c56 | ||
|
|
d72a4fdb33 | ||
|
|
ea8b3062f1 | ||
|
|
3b3896d642 | ||
|
|
3d244b156e | ||
|
|
e968194b1a | ||
|
|
add9a5d1dc |
78
pom.xml
@@ -1,40 +1,42 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.M2</version>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
|
||||
<version>1.3.0.RELEASE</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.0.0.M4</version>
|
||||
<version>1.3.5.RELEASE</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-kafka.version>2.0.0.RELEASE</spring-kafka.version>
|
||||
<java.version>1.7</java.version>
|
||||
<kafka.version>0.11.0.0</kafka.version>
|
||||
<spring-integration-kafka.version>3.0.0.M2</spring-integration-kafka.version>
|
||||
<spring-cloud-stream.version>2.0.0.M2</spring-cloud-stream.version>
|
||||
<spring-kafka.version>1.3.0.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.3.0.RELEASE</spring-integration-kafka.version>
|
||||
<spring-cloud-stream.version>1.3.0.RELEASE</spring-cloud-stream.version>
|
||||
<spring-cloud-build.version>1.3.5.RELEASE</spring-cloud-build.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
<module>spring-cloud-starter-stream-kafka</module>
|
||||
<module>spring-cloud-stream-binder-kafka-docs</module>
|
||||
<module>spring-cloud-stream-binder-kafka-core</module>
|
||||
<module>spring-cloud-stream-binder-kstream</module>
|
||||
<module>spring-cloud-stream-binder-kafka11</module>
|
||||
<module>spring-cloud-starter-stream-kafka11</module>
|
||||
<module>spring-cloud-stream-binder-kafka11-docs</module>
|
||||
<module>spring-cloud-stream-binder-kafka-0.11-test</module>
|
||||
<module>spring-cloud-stream-binder-kafka11-core</module>
|
||||
<module>spring-cloud-stream-binder-kstream11</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
@@ -42,6 +44,11 @@
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-codec</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
@@ -75,6 +82,12 @@
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>${spring-integration-kafka.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro-compiler</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
@@ -116,6 +129,18 @@
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
<version>1.7</version>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<version>2.17</version>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.puppycrawl.tools</groupId>
|
||||
<artifactId>checkstyle</artifactId>
|
||||
<version>7.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
@@ -139,15 +164,26 @@
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-tools</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
<artifactId>spring-cloud-build-tools</artifactId>
|
||||
<version>${spring-cloud-build.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<configuration>
|
||||
<configLocation>checkstyle.xml</configLocation>
|
||||
<headerLocation>checkstyle-header.txt</headerLocation>
|
||||
<includeTestSourceDirectory>true</includeTestSourceDirectory>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>checkstyle-validation</id>
|
||||
<phase>validate</phase>
|
||||
<configuration>
|
||||
<configLocation>checkstyle.xml</configLocation>
|
||||
<encoding>UTF-8</encoding>
|
||||
<consoleOutput>true</consoleOutput>
|
||||
<failsOnError>true</failsOnError>
|
||||
<includeTestSourceDirectory>true</includeTestSourceDirectory>
|
||||
</configuration>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
@@ -3,11 +3,11 @@
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.M2</version>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
|
||||
<version>1.3.0.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
<artifactId>spring-cloud-starter-stream-kafka11</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka for the 0.11.x.x client</description>
|
||||
<url>http://projects.spring.io/spring-cloud</url>
|
||||
<organization>
|
||||
<name>Pivotal Software, Inc.</name>
|
||||
118
spring-cloud-stream-binder-kafka-0.11-test/pom.xml
Normal file
@@ -0,0 +1,118 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
|
||||
<version>1.3.0.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-0.11-test</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder 0.11 Tests</description>
|
||||
<url>http://projects.spring.io/spring-cloud</url>
|
||||
<organization>
|
||||
<name>Pivotal Software, Inc.</name>
|
||||
<url>http://www.spring.io</url>
|
||||
</organization>
|
||||
<properties>
|
||||
<main.basedir>${basedir}/../..</main.basedir>
|
||||
<kafka.version>0.11.0.0</kafka.version>
|
||||
<spring-kafka.version>1.3.0.RELEASE</spring-kafka.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-schema</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-avro-serializer</artifactId>
|
||||
<version>3.2.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-schema-registry</artifactId>
|
||||
<version>3.2.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>confluent</id>
|
||||
<url>http://packages.confluent.io/maven/</url>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>3.0.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@@ -18,7 +18,7 @@ package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.KafkaAdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
@@ -37,12 +37,12 @@ import org.springframework.kafka.support.ProducerListener;
|
||||
* @author Gary Russell
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class KafkaTestBinder extends AbstractKafkaTestBinder {
|
||||
public class Kafka10TestBinder extends AbstractKafkaTestBinder {
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
KafkaTestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
|
||||
public Kafka10TestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
|
||||
try {
|
||||
AdminUtilsOperation adminUtilsOperation = new KafkaAdminUtilsOperation();
|
||||
AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
|
||||
KafkaTopicProvisioner provisioningProvider =
|
||||
new KafkaTopicProvisioner(binderConfiguration, adminUtilsOperation);
|
||||
provisioningProvider.afterPropertiesSet();
|
||||
@@ -63,6 +63,7 @@ public class KafkaTestBinder extends AbstractKafkaTestBinder {
|
||||
|
||||
};
|
||||
|
||||
binder.setCodec(AbstractKafkaTestBinder.getCodec());
|
||||
ProducerListener producerListener = new LoggingProducerListener();
|
||||
binder.setProducerListener(producerListener);
|
||||
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
|
||||
@@ -0,0 +1,296 @@
|
||||
/*
|
||||
* Copyright 2014-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
|
||||
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
|
||||
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.BinderHeaders;
|
||||
import org.springframework.cloud.stream.binder.Binding;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.Spy;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.cloud.stream.config.BindingProperties;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.test.core.BrokerAddress;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Integration tests for the {@link KafkaMessageChannelBinder}.
|
||||
*
|
||||
* This test specifically tests for the 0.11.x.x version of Kafka.
|
||||
*
|
||||
* @author Eric Bottard
|
||||
* @author Marius Bogoevici
|
||||
* @author Mark Fisher
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class Kafka_0_11_BinderTests extends KafkaBinderTests {
|
||||
|
||||
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
|
||||
|
||||
private Kafka10TestBinder binder;
|
||||
|
||||
private final Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Kafka10TestBinder getBinder() {
|
||||
if (binder == null) {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binder = new Kafka10TestBinder(binderConfiguration);
|
||||
}
|
||||
return binder;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
|
||||
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
|
||||
List<String> bAddresses = new ArrayList<>();
|
||||
for (BrokerAddress bAddress : brokerAddresses) {
|
||||
bAddresses.add(bAddress.toString());
|
||||
}
|
||||
String[] foo = new String[bAddresses.size()];
|
||||
binderConfiguration.setBrokers(bAddresses.toArray(foo));
|
||||
binderConfiguration.setZkNodes(embeddedKafka.getZookeeperConnectionString());
|
||||
return binderConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int partitionSize(String topic) {
|
||||
return consumerFactory().createConsumer().partitionsFor(topic).size();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
|
||||
kafkaBinderConfigurationProperties.getZkSessionTimeout(), kafkaBinderConfigurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
|
||||
return new ZkUtils(zkClient, null, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Properties topicConfig) {
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topic, partitions, replicationFactor, new Properties());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int invokePartitionSize(String topic, ZkUtils zkUtils) {
|
||||
return adminUtilsOperation.partitionSize(topic, zkUtils);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKafkaOffsetHeaderKey() {
|
||||
return KafkaHeaders.OFFSET;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
return new Kafka10TestBinder(kafkaBinderConfigurationProperties);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
|
||||
if (multiplier != null) {
|
||||
timeoutMultiplier = Double.parseDouble(multiplier);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean usesExplicitRouting() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getClassUnderTestName() {
|
||||
return CLASS_UNDER_TEST_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Spy spyOn(final String name) {
|
||||
throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
|
||||
}
|
||||
|
||||
|
||||
private ConsumerFactory<byte[], byte[]> consumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST-CONSUMER-GROUP");
|
||||
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
|
||||
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
|
||||
|
||||
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Test
|
||||
public void testTrustedPackages() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
|
||||
BindingProperties producerBindingProperties = createProducerBindingProperties(createProducerProperties());
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output", producerBindingProperties);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("bar.0", moduleOutputChannel,
|
||||
producerBindingProperties.getProducer());
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setTrustedPackages(new String[]{"org.springframework.util"});
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("bar.0",
|
||||
"testSendAndReceiveNoOriginalContentType", moduleInputChannel, consumerProperties);
|
||||
binderBindUnbindLatency();
|
||||
|
||||
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("foo")
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).setHeader("foo", MimeTypeUtils.TEXT_PLAIN)
|
||||
.build();
|
||||
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
Assertions.assertThat(inbound).isNotNull();
|
||||
Assertions.assertThat(inbound.getPayload()).isEqualTo("foo");
|
||||
Assertions.assertThat(inbound.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull();
|
||||
Assertions.assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE))
|
||||
.isEqualTo(MimeTypeUtils.TEXT_PLAIN_VALUE);
|
||||
Assertions.assertThat(inbound.getHeaders().get("foo")).isInstanceOf(MimeType.class);
|
||||
MimeType actual = (MimeType) inbound.getHeaders().get("foo");
|
||||
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN);
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
class Foo{}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testCustomAvroSerialization() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
Map<String, Object> schemaRegistryProps = new HashMap<>();
|
||||
schemaRegistryProps.put("kafkastore.connection.url", configurationProperties.getZkConnectionString());
|
||||
schemaRegistryProps.put("listeners", "http://0.0.0.0:8082");
|
||||
schemaRegistryProps.put("port", "8082");
|
||||
schemaRegistryProps.put("kafkastore.topic", "_schemas");
|
||||
SchemaRegistryConfig config = new SchemaRegistryConfig(schemaRegistryProps);
|
||||
SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
|
||||
Server server = app.createServer();
|
||||
server.start();
|
||||
long endTime = System.currentTimeMillis() + 5000;
|
||||
while(true) {
|
||||
if (server.isRunning()) {
|
||||
break;
|
||||
}
|
||||
else if (System.currentTimeMillis() > endTime) {
|
||||
Assertions.fail("Kafka Schema Registry Server failed to start");
|
||||
}
|
||||
}
|
||||
User1 firstOutboundFoo = new User1();
|
||||
String userName1 = "foo-name" + UUID.randomUUID().toString();
|
||||
String favColor1 = "foo-color" + UUID.randomUUID().toString();
|
||||
firstOutboundFoo.setName(userName1);
|
||||
firstOutboundFoo.setFavoriteColor(favColor1);
|
||||
Message<?> message = MessageBuilder.withPayload(firstOutboundFoo).build();
|
||||
SubscribableChannel moduleOutputChannel = new DirectChannel();
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.getExtension().getConfiguration().put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
|
||||
producerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
|
||||
producerProperties.setUseNativeEncoding(true);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
consumerProperties.getExtension().getConfiguration().put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
|
||||
consumerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
Assertions.assertThat(inbound).isNotNull();
|
||||
assertTrue(message.getPayload() instanceof User1);
|
||||
User1 receivedUser = (User1) message.getPayload();
|
||||
Assertions.assertThat(receivedUser.getName()).isEqualTo(userName1);
|
||||
Assertions.assertThat(receivedUser.getFavoriteColor()).isEqualTo(favColor1);
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testSendAndReceiveWithExplicitConsumerGroupWithRawMode() {
|
||||
// raw mode no longer needed
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testSendAndReceiveWithRawModeAndStringPayload() {
|
||||
// raw mode no longer needed
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.reflect.Nullable;
|
||||
import org.apache.avro.specific.SpecificRecordBase;
|
||||
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
public class User1 extends SpecificRecordBase {
|
||||
|
||||
@Nullable
|
||||
private String name;
|
||||
|
||||
@Nullable
|
||||
private String favoriteColor;
|
||||
|
||||
public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String getFavoriteColor() {
|
||||
return this.favoriteColor;
|
||||
}
|
||||
|
||||
public void setFavoriteColor(String favoriteColor) {
|
||||
this.favoriteColor = favoriteColor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Schema getSchema() {
|
||||
try {
|
||||
return new Schema.Parser().parse(new ClassPathResource("schemas/users_v1.schema").getInputStream());
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(int i) {
|
||||
if (i == 0) {
|
||||
return getName().toString();
|
||||
}
|
||||
if (i == 1) {
|
||||
return getFavoriteColor().toString();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(int i, Object o) {
|
||||
if (i == 0) {
|
||||
setName((String) o);
|
||||
}
|
||||
if (i == 1) {
|
||||
setFavoriteColor((String) o);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,100 +0,0 @@
|
||||
== Partitioning with the Kafka Binder
|
||||
|
||||
Apache Kafka supports topic partitioning natively.
|
||||
|
||||
Sometimes it is advantageous to send data to specific partitions, for example when you want to strictly order message processing - all messsages for a particular customer should go to the same partition.
|
||||
|
||||
The following illustrates how to configure the producer and consumer side:
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Source.class)
|
||||
public class KafkaPartitionProducerApplication {
|
||||
|
||||
private static final Random RANDOM = new Random(System.currentTimeMillis());
|
||||
|
||||
private static final String[] data = new String[] {
|
||||
"foo1", "bar1", "qux1",
|
||||
"foo2", "bar2", "qux2",
|
||||
"foo3", "bar3", "qux3",
|
||||
"foo4", "bar4", "qux4",
|
||||
};
|
||||
|
||||
public static void main(String[] args) {
|
||||
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
|
||||
.web(false)
|
||||
.run(args);
|
||||
}
|
||||
|
||||
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
|
||||
public Message<?> generate() {
|
||||
String value = data[RANDOM.nextInt(data.length)];
|
||||
System.out.println("Sending: " + value);
|
||||
return MessageBuilder.withPayload(value)
|
||||
.setHeader("partitionKey", value)
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
.application.yml
|
||||
[source, yaml]
|
||||
----
|
||||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
bindings:
|
||||
output:
|
||||
destination: partitioned.topic
|
||||
producer:
|
||||
partitioned: true
|
||||
partition-key-expression: headers['partitionKey']
|
||||
partition-count: 12
|
||||
----
|
||||
|
||||
IMPORTANT: The topic must be provisioned to have enough partitions to achieve the desired concurrency for all consumer groups.
|
||||
The above configuration will support up to 12 consumer instances (or 6 if their `concurrency` is 2, etc.).
|
||||
It is generally best to "over provision" the partitions to allow for future increases in consumers and/or concurrency.
|
||||
|
||||
NOTE: The above configuration uses the default partitioning (`key.hashCode() % partitionCount`).
|
||||
This may or may not provide a suitably balanced algorithm, depending on the key values; you can override this default by using the `partitionSelectorExpression` or `partitionSelectorClass` properties.
|
||||
|
||||
Since partitions are natively handled by Kafka, no special configuration is needed on the consumer side.
|
||||
Kafka will allocate partitions across the instances.
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Sink.class)
|
||||
public class KafkaPartitionConsumerApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
|
||||
.web(false)
|
||||
.run(args);
|
||||
}
|
||||
|
||||
@StreamListener(Sink.INPUT)
|
||||
public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
|
||||
System.out.println(in + " received from partition " + partition);
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
.application.yml
|
||||
[source, yaml]
|
||||
----
|
||||
spring:
|
||||
cloud:
|
||||
stream:
|
||||
bindings:
|
||||
input:
|
||||
destination: partitioned.topic
|
||||
group: myGroup
|
||||
----
|
||||
|
||||
You can add instances as needed; Kafka will rebalance the partition allocations.
|
||||
If the instance count (or `instance count * concurrency`) exceeds the number of partitions, some consumers will be idle.
|
||||
@@ -1,25 +1,13 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.M2</version>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
|
||||
<version>1.3.0.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core for the 0.11.x.x client</description>
|
||||
<url>http://projects.spring.io/spring-cloud</url>
|
||||
<organization>
|
||||
<name>Pivotal Software, Inc.</name>
|
||||
@@ -45,13 +33,6 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>${spring-integration-kafka.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro-compiler</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
@@ -0,0 +1,145 @@
|
||||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.admin;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Properties;
|
||||
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class Kafka09AdminUtilsOperation implements AdminUtilsOperation {
|
||||
|
||||
private static Class<?> ADMIN_UTIL_CLASS;
|
||||
|
||||
static {
|
||||
try {
|
||||
ADMIN_UTIL_CLASS = ClassUtils.forName("kafka.admin.AdminUtils", null);
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils class not found", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions,
|
||||
String replicaAssignmentStr, boolean checkBrokerAvailable) {
|
||||
try {
|
||||
Method[] declaredMethods = ADMIN_UTIL_CLASS.getDeclaredMethods();
|
||||
Method addPartitions = null;
|
||||
for (Method m : declaredMethods) {
|
||||
if (m.getName().equals("addPartitions")) {
|
||||
addPartitions = m;
|
||||
}
|
||||
}
|
||||
if (addPartitions != null) {
|
||||
addPartitions.invoke(null, zkUtils, topic, numPartitions,
|
||||
replicaAssignmentStr, checkBrokerAvailable);
|
||||
}
|
||||
else {
|
||||
throw new InvocationTargetException(
|
||||
new RuntimeException("method not found"));
|
||||
}
|
||||
}
|
||||
catch (InvocationTargetException e) {
|
||||
ReflectionUtils.handleInvocationTargetException(e);
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
ReflectionUtils.handleReflectionException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils) {
|
||||
try {
|
||||
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
|
||||
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
|
||||
Class<?> topicMetadataClass = ClassUtils.forName("kafka.api.TopicMetadata", null);
|
||||
Method errorCodeMethod = ReflectionUtils.findMethod(topicMetadataClass, "errorCode");
|
||||
return (short) errorCodeMethod.invoke(result);
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils class not found", e);
|
||||
}
|
||||
catch (InvocationTargetException e) {
|
||||
ReflectionUtils.handleInvocationTargetException(e);
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
ReflectionUtils.handleReflectionException(e);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public int partitionSize(String topic, ZkUtils zkUtils) {
|
||||
try {
|
||||
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
|
||||
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
|
||||
Class<?> topicMetadataClass = ClassUtils.forName("kafka.api.TopicMetadata", null);
|
||||
|
||||
Method partitionsMetadata = ReflectionUtils.findMethod(topicMetadataClass, "partitionsMetadata");
|
||||
scala.collection.Seq<kafka.api.PartitionMetadata> partitionSize =
|
||||
(scala.collection.Seq<kafka.api.PartitionMetadata>)partitionsMetadata.invoke(result);
|
||||
|
||||
return partitionSize.size();
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils class not found", e);
|
||||
}
|
||||
catch (InvocationTargetException e) {
|
||||
ReflectionUtils.handleInvocationTargetException(e);
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
ReflectionUtils.handleReflectionException(e);
|
||||
}
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
||||
public void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
|
||||
int replicationFactor, Properties topicConfig) {
|
||||
try {
|
||||
Method[] declaredMethods = ADMIN_UTIL_CLASS.getDeclaredMethods();
|
||||
Method createTopic = null;
|
||||
for (Method m : declaredMethods) {
|
||||
if (m.getName().equals("createTopic")) {
|
||||
createTopic = m;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (createTopic != null) {
|
||||
createTopic.invoke(null, zkUtils, topic, partitions,
|
||||
replicationFactor, topicConfig);
|
||||
}
|
||||
else {
|
||||
throw new InvocationTargetException(
|
||||
new RuntimeException("method not found"));
|
||||
}
|
||||
}
|
||||
catch (InvocationTargetException e) {
|
||||
ReflectionUtils.handleInvocationTargetException(e);
|
||||
}
|
||||
catch (IllegalAccessException e) {
|
||||
ReflectionUtils.handleReflectionException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -25,7 +25,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class KafkaAdminUtilsOperation implements AdminUtilsOperation {
|
||||
public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
|
||||
public void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions,
|
||||
String replicaAssignmentStr, boolean checkBrokerAvailable) {
|
||||
@@ -18,7 +18,6 @@ package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
|
||||
import org.springframework.util.Assert;
|
||||
@@ -40,6 +40,8 @@ import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningException;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
|
||||
import org.springframework.retry.RetryCallback;
|
||||
import org.springframework.retry.RetryContext;
|
||||
import org.springframework.retry.RetryOperations;
|
||||
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
@@ -201,25 +203,29 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
|
||||
partitionCount);
|
||||
|
||||
this.metadataRetryOperations.execute(context -> {
|
||||
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
|
||||
|
||||
try {
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
|
||||
configurationProperties.getReplicationFactor(), new Properties());
|
||||
}
|
||||
catch (Exception e) {
|
||||
String exceptionClass = e.getClass().getName();
|
||||
if (exceptionClass.equals("kafka.common.TopicExistsException")
|
||||
|| exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
|
||||
@Override
|
||||
public Object doWithRetry(RetryContext context) throws RuntimeException {
|
||||
|
||||
try {
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
|
||||
configurationProperties.getReplicationFactor(), new Properties());
|
||||
}
|
||||
catch (Exception e) {
|
||||
String exceptionClass = e.getClass().getName();
|
||||
if (exceptionClass.equals("kafka.common.TopicExistsException")
|
||||
|| exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
|
||||
}
|
||||
}
|
||||
else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
else {
|
||||
throw e;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
else {
|
||||
@@ -237,23 +243,27 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
final Callable<Collection<PartitionInfo>> callable) {
|
||||
try {
|
||||
return this.metadataRetryOperations
|
||||
.execute(context -> {
|
||||
Collection<PartitionInfo> partitions = callable.call();
|
||||
// do a sanity check on the partition set
|
||||
int partitionSize = partitions.size();
|
||||
if (partitionSize < partitionCount) {
|
||||
if (tolerateLowerPartitionsOnBroker) {
|
||||
logger.warn("The number of expected partitions was: " + partitionCount + ", but "
|
||||
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
|
||||
+ "There will be " + (partitionCount - partitionSize) + " idle consumers");
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException("The number of expected partitions was: "
|
||||
+ partitionCount + ", but " + partitionSize
|
||||
+ (partitionSize > 1 ? " have " : " has ") + "been found instead");
|
||||
.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() {
|
||||
|
||||
@Override
|
||||
public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
|
||||
Collection<PartitionInfo> partitions = callable.call();
|
||||
// do a sanity check on the partition set
|
||||
int partitionSize = partitions.size();
|
||||
if (partitionSize < partitionCount) {
|
||||
if (tolerateLowerPartitionsOnBroker) {
|
||||
logger.warn("The number of expected partitions was: " + partitionCount + ", but "
|
||||
+ partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead."
|
||||
+ "There will be " + (partitionCount - partitionSize) + " idle consumers");
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException("The number of expected partitions was: "
|
||||
+ partitionCount + ", but " + partitionSize
|
||||
+ (partitionSize > 1 ? " have " : " has ") + "been found instead");
|
||||
}
|
||||
}
|
||||
return partitions;
|
||||
}
|
||||
return partitions;
|
||||
});
|
||||
}
|
||||
catch (Exception e) {
|
||||
@@ -4,13 +4,13 @@
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.M2</version>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
|
||||
<version>1.3.0.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
<description>Spring Cloud Stream Kafka Binder Docs</description>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11-docs</artifactId>
|
||||
<name>spring-cloud-stream-binder-kafka11-docs</name>
|
||||
<description>Spring Cloud Stream Kafka Binder Docs for the 0.11.x.x client</description>
|
||||
<properties>
|
||||
<main.basedir>${basedir}/..</main.basedir>
|
||||
</properties>
|
||||
|
Before Width: | Height: | Size: 9.2 KiB After Width: | Height: | Size: 9.2 KiB |
@@ -1,6 +1,6 @@
|
||||
[[spring-cloud-stream-binder-kafka-reference]]
|
||||
= Spring Cloud Stream Kafka Binder Reference Guide
|
||||
Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinathan, Gunnar Hillert, Mark Pollack, Patrick Peralta, Glenn Renfro, Thomas Risberg, Dave Syer, David Turanski, Janne Valkealahti, Benjamin Klein, Henryk Konsek, Gary Russell
|
||||
Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinathan, Gunnar Hillert, Mark Pollack, Patrick Peralta, Glenn Renfro, Thomas Risberg, Dave Syer, David Turanski, Janne Valkealahti, Benjamin Klein, Henryk Konsek
|
||||
:doctype: book
|
||||
:toc:
|
||||
:toclevels: 4
|
||||
@@ -26,7 +26,7 @@ include::overview.adoc[]
|
||||
|
||||
include::dlq.adoc[]
|
||||
|
||||
include::partitions.adoc[]
|
||||
include::metrics.adoc[]
|
||||
|
||||
= Appendices
|
||||
[appendix]
|
||||
@@ -0,0 +1,10 @@
|
||||
[[kafka-metrics]]
|
||||
== Kafka metrics
|
||||
|
||||
Kafka binder module exposes the following metrics:
|
||||
|
||||
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag` - this metric indicates how many messages
|
||||
have not been yet consumed from given binder's topic (`someTopic`) by given consumer group (`someGroup`).
|
||||
For example if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, then
|
||||
consumer group `myGroup` has `1000` messages to waiting to be consumed from topic `myTopic`. This metric is
|
||||
particularly useful to provide auto-scaling feedback to PaaS platform of your choice.
|
||||
@@ -540,12 +540,3 @@ The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureExcep
|
||||
* `record` - the raw `ProducerRecord` that was created from the `failedMessage`
|
||||
|
||||
There is no automatic handling of these exceptions (such as sending to a <<kafka-dlq-processing, Dead-Letter queue>>); you can consume these exceptions with your own Spring Integration flow.
|
||||
|
||||
[[kafka-metrics]]
|
||||
== Kafka Metrics
|
||||
|
||||
Kafka binder module exposes the following metrics:
|
||||
|
||||
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag` - this metric indicates how many messages have not been yet consumed from given binder's topic by given consumer group.
|
||||
For example if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, then consumer group `myGroup` has `1000` messages to waiting to be consumed from topic `myTopic`.
|
||||
This metric is particularly useful to provide auto-scaling feedback to PaaS platform of your choice.
|
||||
|
Before Width: | Height: | Size: 11 KiB After Width: | Height: | Size: 11 KiB |
|
Before Width: | Height: | Size: 2.0 KiB After Width: | Height: | Size: 2.0 KiB |
|
Before Width: | Height: | Size: 2.0 KiB After Width: | Height: | Size: 2.0 KiB |
|
Before Width: | Height: | Size: 45 KiB After Width: | Height: | Size: 45 KiB |
|
Before Width: | Height: | Size: 2.2 KiB After Width: | Height: | Size: 2.2 KiB |
|
Before Width: | Height: | Size: 931 B After Width: | Height: | Size: 931 B |
|
Before Width: | Height: | Size: 2.1 KiB After Width: | Height: | Size: 2.1 KiB |
@@ -2,21 +2,21 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka</name>
|
||||
<description>Kafka binder implementation</description>
|
||||
<name>spring-cloud-stream-binder-kafka11</name>
|
||||
<description>Kafka binder implementation for the 0.11.x.x client</description>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.M2</version>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
|
||||
<version>1.3.0.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
@@ -27,6 +27,10 @@
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-codec</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-autoconfigure</artifactId>
|
||||
@@ -48,6 +52,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
@@ -57,6 +62,7 @@
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<version>${spring-kafka.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
@@ -72,4 +78,20 @@
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>3.0.2</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<goals>
|
||||
<goal>test-jar</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
@@ -1,11 +1,11 @@
|
||||
/*
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@@ -13,25 +13,25 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.boot.actuate.endpoint.PublicMetrics;
|
||||
import org.springframework.boot.actuate.metrics.Metric;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
@@ -42,9 +42,9 @@ import org.springframework.util.ObjectUtils;
|
||||
*
|
||||
* @author Henryk Konsek
|
||||
*/
|
||||
public class KafkaBinderMetrics implements MeterBinder {
|
||||
public class KafkaBinderMetrics implements PublicMetrics {
|
||||
|
||||
private final static Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
|
||||
private final static Logger LOG = LoggerFactory.getLogger(KafkaBinderMetrics.class);
|
||||
|
||||
static final String METRIC_PREFIX = "spring.cloud.stream.binder.kafka";
|
||||
|
||||
@@ -68,7 +68,8 @@ public class KafkaBinderMetrics implements MeterBinder {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bindTo(MeterRegistry registry) {
|
||||
public Collection<Metric<?>> metrics() {
|
||||
List<Metric<?>> metrics = new LinkedList<>();
|
||||
for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse()
|
||||
.entrySet()) {
|
||||
if (!topicInfo.getValue().isConsumerTopic()) {
|
||||
@@ -95,12 +96,13 @@ public class KafkaBinderMetrics implements MeterBinder {
|
||||
lag += endOffset.getValue();
|
||||
}
|
||||
}
|
||||
registry.gauge(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), lag);
|
||||
metrics.add(new Metric<>(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), lag));
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.debug("Cannot generate metric for topic: " + topic, e);
|
||||
}
|
||||
}
|
||||
return metrics;
|
||||
}
|
||||
|
||||
private ConsumerFactory<?, ?> createConsumerFactory(String group) {
|
||||
@@ -121,4 +123,4 @@ public class KafkaBinderMetrics implements MeterBinder {
|
||||
return new DefaultKafkaConsumerFactory<>(props);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -25,6 +25,7 @@ import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
@@ -39,6 +40,7 @@ import org.apache.kafka.common.utils.Utils;
|
||||
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.BinderHeaders;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
@@ -67,15 +69,16 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.config.ContainerProperties;
|
||||
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.kafka.support.TopicPartitionInitialOffset;
|
||||
import org.springframework.kafka.support.converter.MessagingMessageConverter;
|
||||
import org.springframework.kafka.transaction.KafkaTransactionManager;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
@@ -84,7 +87,7 @@ import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
|
||||
/**
|
||||
* A {@link org.springframework.cloud.stream.binder.Binder} that uses Kafka as the underlying middleware.
|
||||
* A {@link Binder} that uses Kafka as the underlying middleware.
|
||||
*
|
||||
* @author Eric Bottard
|
||||
* @author Marius Bogoevici
|
||||
@@ -97,7 +100,8 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
* @author Doug Saus
|
||||
*/
|
||||
public class KafkaMessageChannelBinder extends
|
||||
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
|
||||
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
|
||||
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
|
||||
|
||||
private final KafkaBinderConfigurationProperties configurationProperties;
|
||||
@@ -149,24 +153,31 @@ public class KafkaMessageChannelBinder extends
|
||||
@Override
|
||||
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel errorChannel)
|
||||
throws Exception {
|
||||
throws Exception {
|
||||
/*
|
||||
* IMPORTANT: With a transactional binder, individual producer properties for Kafka are
|
||||
* ignored; the global binder (spring.cloud.stream.kafka.binder.transaction.producer.*)
|
||||
* properties are used instead, for all producers. A binder is transactional when
|
||||
* IMPORTANT: With a transactional binder, individual producer properties for
|
||||
* Kafka are ignored; the global binder
|
||||
* (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
|
||||
* instead, for all producers. A binder is transactional when
|
||||
* 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
|
||||
*/
|
||||
final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
|
||||
? this.transactionManager.getProducerFactory()
|
||||
: getProducerFactory(null, producerProperties);
|
||||
? this.transactionManager.getProducerFactory()
|
||||
: getProducerFactory(null, producerProperties);
|
||||
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
|
||||
producerProperties.getPartitionCount(), false,
|
||||
() -> {
|
||||
Producer<byte[], byte[]> producer = producerFB.createProducer();
|
||||
List<PartitionInfo> partitionsFor = producer.partitionsFor(destination.getName());
|
||||
producer.close();
|
||||
((DisposableBean) producerFB).destroy();
|
||||
return partitionsFor;
|
||||
producerProperties.getPartitionCount(),
|
||||
false,
|
||||
new Callable<Collection<PartitionInfo>>() {
|
||||
|
||||
@Override
|
||||
public Collection<PartitionInfo> call() throws Exception {
|
||||
Producer<byte[], byte[]> producer = producerFB.createProducer();
|
||||
List<PartitionInfo> partitionsFor = producer.partitionsFor(destination.getName());
|
||||
producer.close();
|
||||
((DisposableBean) producerFB).destroy();
|
||||
return partitionsFor;
|
||||
}
|
||||
|
||||
});
|
||||
this.topicsInUse.put(destination.getName(), new TopicInformation(null, partitions));
|
||||
if (producerProperties.getPartitionCount() < partitions.size()) {
|
||||
@@ -176,11 +187,12 @@ public class KafkaMessageChannelBinder extends
|
||||
+ partitions.size() + " of the topic. The larger number will be used instead.");
|
||||
}
|
||||
/*
|
||||
* This is dirty; it relies on the fact that we, and the partition interceptor, share a
|
||||
* hard reference to the producer properties instance. But I don't see another way to fix
|
||||
* it since the interceptor has already been added to the channel, and we don't have
|
||||
* access to the channel here; if we did, we could inject the proper partition count
|
||||
* there. TODO: Consider this when doing the 2.0 binder restructuring.
|
||||
* This is dirty; it relies on the fact that we, and the partition
|
||||
* interceptor, share a hard reference to the producer properties instance.
|
||||
* But I don't see another way to fix it since the interceptor has already
|
||||
* been added to the channel, and we don't have access to the channel here; if
|
||||
* we did, we could inject the proper partition count there.
|
||||
* TODO: Consider this when doing the 2.0 binder restructuring.
|
||||
*/
|
||||
producerProperties.setPartitionCount(partitions.size());
|
||||
}
|
||||
@@ -203,9 +215,7 @@ public class KafkaMessageChannelBinder extends
|
||||
if (!patterns.contains("!" + MessageHeaders.ID)) {
|
||||
patterns.add(0, "!" + MessageHeaders.ID);
|
||||
}
|
||||
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(
|
||||
patterns.toArray(new String[patterns.size()]));
|
||||
handler.setHeaderMapper(headerMapper);
|
||||
handler.setHeaderMapper(new DefaultKafkaHeaderMapper(patterns.toArray(new String[patterns.size()])));
|
||||
}
|
||||
return handler;
|
||||
}
|
||||
@@ -262,11 +272,16 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
|
||||
extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
|
||||
() -> {
|
||||
Consumer<?, ?> consumer = consumerFactory.createConsumer();
|
||||
List<PartitionInfo> partitionsFor = consumer.partitionsFor(destination.getName());
|
||||
consumer.close();
|
||||
return partitionsFor;
|
||||
new Callable<Collection<PartitionInfo>>() {
|
||||
|
||||
@Override
|
||||
public Collection<PartitionInfo> call() throws Exception {
|
||||
Consumer<?, ?> consumer = consumerFactory.createConsumer();
|
||||
List<PartitionInfo> partitionsFor = consumer.partitionsFor(destination.getName());
|
||||
consumer.close();
|
||||
return partitionsFor;
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
Collection<PartitionInfo> listenedPartitions;
|
||||
@@ -300,8 +315,9 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
|
||||
@SuppressWarnings("rawtypes")
|
||||
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer = new ConcurrentMessageListenerContainer(
|
||||
consumerFactory, containerProperties) {
|
||||
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
|
||||
new ConcurrentMessageListenerContainer(
|
||||
consumerFactory, containerProperties) {
|
||||
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
@@ -333,6 +349,7 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
messageConverter.setHeaderMapper(headerMapper);
|
||||
kafkaMessageDrivenChannelAdapter.setMessageConverter(messageConverter);
|
||||
|
||||
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
|
||||
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, consumerGroup,
|
||||
extendedConsumerProperties);
|
||||
@@ -359,43 +376,46 @@ public class KafkaMessageChannelBinder extends
|
||||
? this.transactionManager.getProducerFactory()
|
||||
: getProducerFactory(null, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
|
||||
final KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
|
||||
return message -> {
|
||||
final ConsumerRecord<?, ?> record = message.getHeaders()
|
||||
.get(KafkaHeaders.RAW_DATA, ConsumerRecord.class);
|
||||
final byte[] key = record.key() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
|
||||
: null;
|
||||
final byte[] payload = record.value() != null
|
||||
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value()))
|
||||
: null;
|
||||
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
|
||||
? extendedConsumerProperties.getExtension().getDlqName()
|
||||
: "error." + destination.getName() + "." + group;
|
||||
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(dlqName, record.partition(),
|
||||
key, payload, record.headers());
|
||||
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(producerRecord);
|
||||
sentDlq.addCallback(new ListenableFutureCallback<SendResult<byte[], byte[]>>() {
|
||||
StringBuilder sb = new StringBuilder().append(" a message with key='")
|
||||
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'")
|
||||
.append(" and payload='")
|
||||
.append(toDisplayString(ObjectUtils.nullSafeToString(payload), 50))
|
||||
.append("'").append(" received from ")
|
||||
.append(record.partition());
|
||||
return new MessageHandler() {
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable ex) {
|
||||
KafkaMessageChannelBinder.this.logger.error(
|
||||
"Error sending to DLQ " + sb.toString(), ex);
|
||||
}
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
final ConsumerRecord<?, ?> record = message.getHeaders()
|
||||
.get(KafkaMessageDrivenChannelAdapter.KAFKA_RAW_DATA, ConsumerRecord.class);
|
||||
final byte[] key = record.key() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
|
||||
: null;
|
||||
final byte[] payload = record.value() != null
|
||||
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
|
||||
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
|
||||
? extendedConsumerProperties.getExtension().getDlqName()
|
||||
: "error." + destination.getName() + "." + group;
|
||||
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(dlqName, record.partition(),
|
||||
key, payload, record.headers());
|
||||
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(producerRecord);
|
||||
sentDlq.addCallback(new ListenableFutureCallback<SendResult<byte[], byte[]>>() {
|
||||
StringBuilder sb = new StringBuilder().append(" a message with key='")
|
||||
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'")
|
||||
.append(" and payload='")
|
||||
.append(toDisplayString(ObjectUtils.nullSafeToString(payload), 50))
|
||||
.append("'").append(" received from ")
|
||||
.append(record.partition());
|
||||
|
||||
@Override
|
||||
public void onSuccess(SendResult<byte[], byte[]> result) {
|
||||
if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
|
||||
KafkaMessageChannelBinder.this.logger.debug(
|
||||
"Sent to DLQ " + sb.toString());
|
||||
@Override
|
||||
public void onFailure(Throwable ex) {
|
||||
KafkaMessageChannelBinder.this.logger.error(
|
||||
"Error sending to DLQ " + sb.toString(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
@Override
|
||||
public void onSuccess(SendResult<byte[], byte[]> result) {
|
||||
if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
|
||||
KafkaMessageChannelBinder.this.logger.debug(
|
||||
"Sent to DLQ " + sb.toString());
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
return null;
|
||||
@@ -20,13 +20,14 @@ import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.utils.AppInfoParser;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.actuate.endpoint.PublicMetrics;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
|
||||
@@ -36,15 +37,22 @@ import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.KafkaAdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.config.codec.kryo.KryoCodecAutoConfiguration;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Condition;
|
||||
import org.springframework.context.annotation.ConditionContext;
|
||||
import org.springframework.context.annotation.Conditional;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.core.type.AnnotatedTypeMetadata;
|
||||
import org.springframework.integration.codec.Codec;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
|
||||
@@ -63,12 +71,15 @@ import org.springframework.util.ObjectUtils;
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(Binder.class)
|
||||
@Import({ PropertyPlaceholderAutoConfiguration.class})
|
||||
@Import({ KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class})
|
||||
@EnableConfigurationProperties({ KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class })
|
||||
public class KafkaBinderConfiguration {
|
||||
|
||||
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
|
||||
|
||||
@Autowired
|
||||
private Codec codec;
|
||||
|
||||
@Autowired
|
||||
private KafkaBinderConfigurationProperties configurationProperties;
|
||||
|
||||
@@ -93,6 +104,7 @@ public class KafkaBinderConfiguration {
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder() {
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
|
||||
this.configurationProperties, provisioningProvider());
|
||||
kafkaMessageChannelBinder.setCodec(this.codec);
|
||||
kafkaMessageChannelBinder.setProducerListener(producerListener);
|
||||
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
|
||||
return kafkaMessageChannelBinder;
|
||||
@@ -123,14 +135,24 @@ public class KafkaBinderConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
|
||||
public PublicMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
|
||||
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties);
|
||||
}
|
||||
|
||||
@Bean(name = "adminUtilsOperation")
|
||||
@Conditional(Kafka09Present.class)
|
||||
@ConditionalOnClass(name = "kafka.admin.AdminUtils")
|
||||
public AdminUtilsOperation kafka09AdminUtilsOperation() {
|
||||
logger.info("AdminUtils selected: Kafka 0.9 AdminUtils");
|
||||
return new Kafka09AdminUtilsOperation();
|
||||
}
|
||||
|
||||
@Bean(name = "adminUtilsOperation")
|
||||
@Conditional(Kafka1xPresent.class)
|
||||
@ConditionalOnClass(name = "kafka.admin.AdminUtils")
|
||||
public AdminUtilsOperation kafka10AdminUtilsOperation() {
|
||||
return new KafkaAdminUtilsOperation();
|
||||
logger.info("AdminUtils selected: Kafka 0.10 AdminUtils");
|
||||
return new Kafka10AdminUtilsOperation();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@@ -138,6 +160,22 @@ public class KafkaBinderConfiguration {
|
||||
return new KafkaJaasLoginModuleInitializer();
|
||||
}
|
||||
|
||||
static class Kafka1xPresent implements Condition {
|
||||
|
||||
@Override
|
||||
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
|
||||
return AppInfoParser.getVersion().startsWith("0.1");
|
||||
}
|
||||
}
|
||||
|
||||
static class Kafka09Present implements Condition {
|
||||
|
||||
@Override
|
||||
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
|
||||
return AppInfoParser.getVersion().startsWith("0.9");
|
||||
}
|
||||
}
|
||||
|
||||
public static class JaasConfigurationProperties {
|
||||
|
||||
private JaasLoginModuleConfiguration kafka;
|
||||
@@ -13,7 +13,6 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import org.springframework.cloud.stream.binder.AbstractTestBinder;
|
||||
@@ -22,6 +21,9 @@ import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.integration.codec.Codec;
|
||||
import org.springframework.integration.codec.kryo.PojoCodec;
|
||||
import org.springframework.integration.tuple.TupleKryoRegistrar;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
@@ -45,4 +47,9 @@ public abstract class AbstractKafkaTestBinder extends
|
||||
return this.applicationContext;
|
||||
}
|
||||
|
||||
protected static Codec getCodec() {
|
||||
return new PojoCodec(new TupleKryoRegistrar());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
@@ -13,7 +13,6 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
@@ -13,7 +13,6 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
@@ -13,9 +13,12 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@@ -35,8 +38,6 @@ import org.springframework.boot.actuate.health.Health;
|
||||
import org.springframework.boot.actuate.health.Status;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Barry Commins
|
||||
* @author Gary Russell
|
||||
@@ -61,8 +62,8 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
@Before
|
||||
public void setup() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
org.mockito.BDDMockito.given(consumerFactory.createConsumer()).willReturn(consumer);
|
||||
org.mockito.BDDMockito.given(binder.getTopicsInUse()).willReturn(topicsInUse);
|
||||
given(consumerFactory.createConsumer()).willReturn(consumer);
|
||||
given(binder.getTopicsInUse()).willReturn(topicsInUse);
|
||||
this.indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
|
||||
this.indicator.setTimeout(10);
|
||||
}
|
||||
@@ -71,17 +72,17 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
public void kafkaBinderIsUp() {
|
||||
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Health health = indicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||
org.mockito.Mockito.verify(this.consumer).close();
|
||||
verify(this.consumer).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void kafkaBinderIsDown() {
|
||||
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Health health = indicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
}
|
||||
@@ -90,7 +91,7 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
public void kafkaBinderDoesNotAnswer() {
|
||||
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willAnswer(new Answer<Object>() {
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willAnswer(new Answer<Object>() {
|
||||
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
* Copyright 2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -13,16 +13,14 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.Node;
|
||||
@@ -33,11 +31,17 @@ import org.junit.Test;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import org.springframework.boot.actuate.metrics.Metric;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.TopicInformation;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyCollectionOf;
|
||||
import static org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics.METRIC_PREFIX;
|
||||
|
||||
/**
|
||||
* @author Henryk Konsek
|
||||
@@ -57,8 +61,6 @@ public class KafkaBinderMetricsTest {
|
||||
@Mock
|
||||
private KafkaMessageChannelBinder binder;
|
||||
|
||||
private MeterRegistry meterRegistry = new SimpleMeterRegistry();
|
||||
|
||||
private Map<String, TopicInformation> topicsInUse = new HashMap<>();
|
||||
|
||||
@Mock
|
||||
@@ -67,23 +69,24 @@ public class KafkaBinderMetricsTest {
|
||||
@Before
|
||||
public void setup() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
org.mockito.BDDMockito.given(consumerFactory.createConsumer()).willReturn(consumer);
|
||||
org.mockito.BDDMockito.given(binder.getTopicsInUse()).willReturn(topicsInUse);
|
||||
given(consumerFactory.createConsumer()).willReturn(consumer);
|
||||
given(binder.getTopicsInUse()).willReturn(topicsInUse);
|
||||
metrics = new KafkaBinderMetrics(binder, kafkaBinderConfigurationProperties, consumerFactory);
|
||||
org.mockito.BDDMockito.given(consumer.endOffsets(org.mockito.Matchers.anyCollectionOf(TopicPartition.class)))
|
||||
.willReturn(java.util.Collections.singletonMap(new TopicPartition(TEST_TOPIC, 0), 1000L));
|
||||
given(consumer.endOffsets(anyCollectionOf(TopicPartition.class)))
|
||||
.willReturn(singletonMap(new TopicPartition(TEST_TOPIC, 0), 1000L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldIndicateLag() {
|
||||
org.mockito.BDDMockito.given(consumer.committed(org.mockito.Matchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
given(consumer.committed(any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).hasSize(1);
|
||||
MeterRegistry.Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(group.gauge().get().value()).isEqualTo(500.0);
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Collection<Metric<?>> collectedMetrics = metrics.metrics();
|
||||
assertThat(collectedMetrics).hasSize(1);
|
||||
assertThat(collectedMetrics.iterator().next().getName())
|
||||
.isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(500L);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -91,34 +94,36 @@ public class KafkaBinderMetricsTest {
|
||||
Map<TopicPartition, Long> endOffsets = new HashMap<>();
|
||||
endOffsets.put(new TopicPartition(TEST_TOPIC, 0), 1000L);
|
||||
endOffsets.put(new TopicPartition(TEST_TOPIC, 1), 1000L);
|
||||
org.mockito.BDDMockito.given(consumer.endOffsets(org.mockito.Matchers.anyCollectionOf(TopicPartition.class))).willReturn(endOffsets);
|
||||
org.mockito.BDDMockito.given(consumer.committed(org.mockito.Matchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
given(consumer.endOffsets(anyCollectionOf(TopicPartition.class))).willReturn(endOffsets);
|
||||
given(consumer.committed(any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0), new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).hasSize(1);
|
||||
MeterRegistry.Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(group.gauge().get().value()).isEqualTo(1000.0);
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Collection<Metric<?>> collectedMetrics = metrics.metrics();
|
||||
assertThat(collectedMetrics).hasSize(1);
|
||||
assertThat(collectedMetrics.iterator().next().getName())
|
||||
.isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(1000L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldIndicateFullLagForNotCommittedGroups() {
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).hasSize(1);
|
||||
MeterRegistry.Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(group.gauge().get().value()).isEqualTo(1000.0);
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Collection<Metric<?>> collectedMetrics = metrics.metrics();
|
||||
assertThat(collectedMetrics).hasSize(1);
|
||||
assertThat(collectedMetrics.iterator().next().getName())
|
||||
.isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(1000L);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotCalculateLagForProducerTopics() {
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation(null, partitions));
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).isEmpty();
|
||||
Collection<Metric<?>> collectedMetrics = metrics.metrics();
|
||||
assertThat(collectedMetrics).isEmpty();
|
||||
}
|
||||
|
||||
private List<PartitionInfo> partitions(Node... nodes) {
|
||||
@@ -16,10 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -30,41 +27,27 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.assertj.core.api.Condition;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.BinderHeaders;
|
||||
import org.springframework.cloud.stream.binder.Binding;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.HeaderMode;
|
||||
import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
|
||||
import org.springframework.cloud.stream.binder.PartitionTestSupport;
|
||||
import org.springframework.cloud.stream.binder.Spy;
|
||||
import org.springframework.cloud.stream.binder.TestUtils;
|
||||
import org.springframework.cloud.stream.binder.kafka.admin.KafkaAdminUtilsOperation;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
@@ -80,7 +63,6 @@ import org.springframework.integration.context.IntegrationContextUtils;
|
||||
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
|
||||
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
|
||||
import org.springframework.integration.kafka.support.KafkaSendFailureException;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
@@ -89,8 +71,6 @@ import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.kafka.support.TopicPartitionInitialOffset;
|
||||
import org.springframework.kafka.test.core.BrokerAddress;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
@@ -103,8 +83,6 @@ import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.retry.backoff.FixedBackOffPolicy;
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.SettableListenableFuture;
|
||||
|
||||
@@ -113,6 +91,8 @@ import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
@@ -120,21 +100,12 @@ import static org.mockito.Mockito.mock;
|
||||
* @author Henryk Konsek
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public class KafkaBinderTests extends
|
||||
public abstract class KafkaBinderTests extends
|
||||
PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedProvisioningException = ExpectedException.none();
|
||||
|
||||
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
|
||||
|
||||
private KafkaTestBinder binder;
|
||||
|
||||
private final KafkaAdminUtilsOperation adminUtilsOperation = new KafkaAdminUtilsOperation();
|
||||
|
||||
@Override
|
||||
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
|
||||
final ExtendedConsumerProperties<KafkaConsumerProperties> kafkaConsumerProperties = new ExtendedConsumerProperties<>(
|
||||
@@ -153,196 +124,21 @@ public class KafkaBinderTests extends
|
||||
return producerProperties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
public abstract String getKafkaOffsetHeaderKey();
|
||||
|
||||
@Override
|
||||
protected KafkaTestBinder getBinder() {
|
||||
if (binder == null) {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binder = new KafkaTestBinder(binderConfiguration);
|
||||
}
|
||||
return binder;
|
||||
}
|
||||
protected abstract Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties);
|
||||
|
||||
private KafkaBinderConfigurationProperties createConfigurationProperties() {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
|
||||
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
|
||||
List<String> bAddresses = new ArrayList<>();
|
||||
for (BrokerAddress bAddress : brokerAddresses) {
|
||||
bAddresses.add(bAddress.toString());
|
||||
}
|
||||
String[] foo = new String[bAddresses.size()];
|
||||
binderConfiguration.setBrokers(bAddresses.toArray(foo));
|
||||
binderConfiguration.setZkNodes(embeddedKafka.getZookeeperConnectionString());
|
||||
return binderConfiguration;
|
||||
}
|
||||
protected abstract KafkaBinderConfigurationProperties createConfigurationProperties();
|
||||
|
||||
private int partitionSize(String topic) {
|
||||
return consumerFactory().createConsumer().partitionsFor(topic).size();
|
||||
}
|
||||
protected abstract int partitionSize(String topic);
|
||||
|
||||
private ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
|
||||
kafkaBinderConfigurationProperties.getZkSessionTimeout(), kafkaBinderConfigurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
protected abstract ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties);
|
||||
|
||||
return new ZkUtils(zkClient, null, false);
|
||||
}
|
||||
protected abstract void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions,
|
||||
int replicationFactor, Properties topicConfig);
|
||||
|
||||
private void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Properties topicConfig) {
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topic, partitions, replicationFactor, new Properties());
|
||||
}
|
||||
|
||||
private int invokePartitionSize(String topic, ZkUtils zkUtils) {
|
||||
return adminUtilsOperation.partitionSize(topic, zkUtils);
|
||||
}
|
||||
|
||||
private String getKafkaOffsetHeaderKey() {
|
||||
return KafkaHeaders.OFFSET;
|
||||
}
|
||||
|
||||
private Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
return new KafkaTestBinder(kafkaBinderConfigurationProperties);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
|
||||
if (multiplier != null) {
|
||||
timeoutMultiplier = Double.parseDouble(multiplier);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean usesExplicitRouting() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getClassUnderTestName() {
|
||||
return CLASS_UNDER_TEST_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Spy spyOn(final String name) {
|
||||
throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
|
||||
}
|
||||
|
||||
private ConsumerFactory<byte[], byte[]> consumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
|
||||
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST-CONSUMER-GROUP");
|
||||
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
|
||||
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
|
||||
|
||||
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Test
|
||||
public void testTrustedPackages() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
|
||||
BindingProperties producerBindingProperties = createProducerBindingProperties(createProducerProperties());
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output", producerBindingProperties);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("bar.0", moduleOutputChannel,
|
||||
producerBindingProperties.getProducer());
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setTrustedPackages(new String[]{"org.springframework.util"});
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("bar.0",
|
||||
"testSendAndReceiveNoOriginalContentType", moduleInputChannel, consumerProperties);
|
||||
binderBindUnbindLatency();
|
||||
|
||||
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("foo")
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE)
|
||||
.setHeader("foo", MimeTypeUtils.TEXT_PLAIN)
|
||||
.build();
|
||||
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
Assertions.assertThat(inbound).isNotNull();
|
||||
Assertions.assertThat(inbound.getPayload()).isEqualTo("foo".getBytes());
|
||||
Assertions.assertThat(inbound.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull();
|
||||
Assertions.assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE))
|
||||
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
|
||||
Assertions.assertThat(inbound.getHeaders().get("foo")).isInstanceOf(MimeType.class);
|
||||
MimeType actual = (MimeType) inbound.getHeaders().get("foo");
|
||||
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN);
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSendAndReceiveNoOriginalContentType() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
|
||||
BindingProperties producerBindingProperties = createProducerBindingProperties(
|
||||
createProducerProperties());
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
producerBindingProperties);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("bar.0",
|
||||
moduleOutputChannel, producerBindingProperties.getProducer());
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setTrustedPackages(new String[] {"org.springframework.util"});
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("bar.0",
|
||||
"testSendAndReceiveNoOriginalContentType", moduleInputChannel,
|
||||
consumerProperties);
|
||||
binderBindUnbindLatency();
|
||||
|
||||
//TODO: Will have to fix the MimeType to convert to byte array once this issue has been resolved:
|
||||
//https://github.com/spring-projects/spring-kafka/issues/424
|
||||
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("foo".getBytes())
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE.getBytes()).build();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(inbound.getPayload()).isEqualTo("foo".getBytes());
|
||||
assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE))
|
||||
.isEqualTo(MimeTypeUtils.TEXT_PLAIN_VALUE.getBytes());
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSendAndReceive() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
BindingProperties outputBindingProperties = createProducerBindingProperties(
|
||||
createProducerProperties());
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
outputBindingProperties);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.0",
|
||||
moduleOutputChannel, outputBindingProperties.getProducer());
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.0",
|
||||
"testSendAndReceive", moduleInputChannel, createConsumerProperties());
|
||||
// Bypass conversion we are only testing sendReceive
|
||||
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("foo".getBytes())
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE,
|
||||
MimeTypeUtils.APPLICATION_OCTET_STREAM_VALUE.getBytes())
|
||||
.build();
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(inbound.getPayload()).isEqualTo("foo".getBytes());
|
||||
assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE))
|
||||
.isEqualTo(MimeTypeUtils.APPLICATION_OCTET_STREAM_VALUE.getBytes());
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
protected abstract int invokePartitionSize(String topic,
|
||||
ZkUtils zkUtils);
|
||||
|
||||
@Test
|
||||
public void testDlqAndRetry() throws Exception {
|
||||
@@ -389,23 +185,35 @@ public class KafkaBinderTests extends
|
||||
final AtomicReference<Message<?>> boundErrorChannelMessage = new AtomicReference<>();
|
||||
final AtomicReference<Message<?>> globalErrorChannelMessage = new AtomicReference<>();
|
||||
final AtomicBoolean hasRecovererInCallStack = new AtomicBoolean(!withRetry);
|
||||
boundErrorChannel.subscribe(message -> {
|
||||
boundErrorChannelMessage.set(message);
|
||||
String stackTrace = Arrays.toString(new RuntimeException().getStackTrace());
|
||||
hasRecovererInCallStack.set(stackTrace.contains("ErrorMessageSendingRecoverer"));
|
||||
boundErrorChannel.subscribe(new MessageHandler() {
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
boundErrorChannelMessage.set(message);
|
||||
String stackTrace = Arrays.toString(new RuntimeException().getStackTrace());
|
||||
hasRecovererInCallStack.set(stackTrace.contains("ErrorMessageSendingRecoverer"));
|
||||
}
|
||||
|
||||
});
|
||||
globalErrorChannel.subscribe(new MessageHandler() {
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
globalErrorChannelMessage.set(message);
|
||||
}
|
||||
|
||||
});
|
||||
globalErrorChannel.subscribe(globalErrorChannelMessage::set);
|
||||
|
||||
Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
|
||||
"error.dlqTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties);
|
||||
binderBindUnbindLatency();
|
||||
String testMessagePayload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage = MessageBuilder.withPayload(testMessagePayload.getBytes()).build();
|
||||
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
|
||||
moduleOutputChannel.send(testMessage);
|
||||
|
||||
Message<?> receivedMessage = receive(dlqChannel, 3);
|
||||
assertThat(receivedMessage).isNotNull();
|
||||
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload);
|
||||
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
|
||||
binderBindUnbindLatency();
|
||||
|
||||
@@ -441,7 +249,7 @@ public class KafkaBinderTests extends
|
||||
"testGroup", moduleInputChannel, consumerProperties);
|
||||
|
||||
String testMessagePayload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage = MessageBuilder.withPayload(testMessagePayload.getBytes()).build();
|
||||
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
|
||||
moduleOutputChannel.send(testMessage);
|
||||
|
||||
assertThat(handler.getLatch().await((int) (timeoutMultiplier * 1000), TimeUnit.MILLISECONDS));
|
||||
@@ -449,7 +257,7 @@ public class KafkaBinderTests extends
|
||||
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
|
||||
Message<?> receivedMessage = handler.getReceivedMessages().entrySet().iterator().next().getValue();
|
||||
assertThat(receivedMessage).isNotNull();
|
||||
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload);
|
||||
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
|
||||
consumerBinding.unbind();
|
||||
|
||||
@@ -459,13 +267,13 @@ public class KafkaBinderTests extends
|
||||
successfulInputChannel, consumerProperties);
|
||||
binderBindUnbindLatency();
|
||||
String testMessage2Payload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage2 = MessageBuilder.withPayload(testMessage2Payload.getBytes()).build();
|
||||
Message<String> testMessage2 = MessageBuilder.withPayload(testMessage2Payload).build();
|
||||
moduleOutputChannel.send(testMessage2);
|
||||
|
||||
Message<?> firstReceived = receive(successfulInputChannel);
|
||||
assertThat(firstReceived.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
assertThat(firstReceived.getPayload()).isEqualTo(testMessagePayload);
|
||||
Message<?> secondReceived = receive(successfulInputChannel);
|
||||
assertThat(secondReceived.getPayload()).isEqualTo(testMessage2Payload.getBytes());
|
||||
assertThat(secondReceived.getPayload()).isEqualTo(testMessage2Payload);
|
||||
consumerBinding.unbind();
|
||||
producerBinding.unbind();
|
||||
}
|
||||
@@ -498,18 +306,18 @@ public class KafkaBinderTests extends
|
||||
"error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties);
|
||||
|
||||
String testMessagePayload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage = MessageBuilder.withPayload(testMessagePayload.getBytes()).build();
|
||||
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
|
||||
moduleOutputChannel.send(testMessage);
|
||||
|
||||
Message<?> dlqMessage = receive(dlqChannel, 3);
|
||||
assertThat(dlqMessage).isNotNull();
|
||||
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload);
|
||||
|
||||
// first attempt fails
|
||||
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
|
||||
Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator().next().getValue();
|
||||
assertThat(handledMessage).isNotNull();
|
||||
assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload);
|
||||
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
|
||||
binderBindUnbindLatency();
|
||||
dlqConsumerBinding.unbind();
|
||||
@@ -520,11 +328,11 @@ public class KafkaBinderTests extends
|
||||
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0", "testGroup",
|
||||
successfulInputChannel, consumerProperties);
|
||||
String testMessage2Payload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage2 = MessageBuilder.withPayload(testMessage2Payload.getBytes()).build();
|
||||
Message<String> testMessage2 = MessageBuilder.withPayload(testMessage2Payload).build();
|
||||
moduleOutputChannel.send(testMessage2);
|
||||
|
||||
Message<?> receivedMessage = receive(successfulInputChannel);
|
||||
assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload.getBytes());
|
||||
assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload);
|
||||
|
||||
binderBindUnbindLatency();
|
||||
consumerBinding.unbind();
|
||||
@@ -561,18 +369,18 @@ public class KafkaBinderTests extends
|
||||
dlqConsumerProperties);
|
||||
|
||||
String testMessagePayload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage = MessageBuilder.withPayload(testMessagePayload.getBytes()).build();
|
||||
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
|
||||
moduleOutputChannel.send(testMessage);
|
||||
|
||||
Message<?> dlqMessage = receive(dlqChannel, 3);
|
||||
assertThat(dlqMessage).isNotNull();
|
||||
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload);
|
||||
|
||||
// first attempt fails
|
||||
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
|
||||
Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator().next().getValue();
|
||||
assertThat(handledMessage).isNotNull();
|
||||
assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload);
|
||||
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
|
||||
binderBindUnbindLatency();
|
||||
dlqConsumerBinding.unbind();
|
||||
@@ -583,11 +391,11 @@ public class KafkaBinderTests extends
|
||||
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0", "testGroup",
|
||||
successfulInputChannel, consumerProperties);
|
||||
String testMessage2Payload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage2 = MessageBuilder.withPayload(testMessage2Payload.getBytes()).build();
|
||||
Message<String> testMessage2 = MessageBuilder.withPayload(testMessage2Payload).build();
|
||||
moduleOutputChannel.send(testMessage2);
|
||||
|
||||
Message<?> receivedMessage = receive(successfulInputChannel);
|
||||
assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload.getBytes());
|
||||
assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload);
|
||||
|
||||
binderBindUnbindLatency();
|
||||
consumerBinding.unbind();
|
||||
@@ -792,6 +600,8 @@ public class KafkaBinderTests extends
|
||||
Binder binder = getBinder(createConfigurationProperties());
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
// binder.setApplicationContext(context);
|
||||
// binder.afterPropertiesSet();
|
||||
DirectChannel output = new DirectChannel();
|
||||
QueueChannel input1 = new QueueChannel();
|
||||
|
||||
@@ -1141,28 +951,20 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
};
|
||||
|
||||
ObjectMapper om = new ObjectMapper();
|
||||
|
||||
if (usesExplicitRouting()) {
|
||||
assertThat(om.readValue((byte[])receive0.getPayload(), Integer.class)).isEqualTo(0);
|
||||
assertThat(om.readValue((byte[])receive1.getPayload(), Integer.class)).isEqualTo(1);
|
||||
assertThat(om.readValue((byte[])receive2.getPayload(), Integer.class)).isEqualTo(2);
|
||||
assertThat(receive0.getPayload()).isEqualTo(0);
|
||||
assertThat(receive1.getPayload()).isEqualTo(1);
|
||||
assertThat(receive2.getPayload()).isEqualTo(2);
|
||||
assertThat(receive2).has(correlationHeadersForPayload2);
|
||||
}
|
||||
else {
|
||||
List<Message<?>> receivedMessages = Arrays.asList(receive0, receive1, receive2);
|
||||
assertThat(receivedMessages).extracting("payload").containsExactlyInAnyOrder(new byte[]{48}, new byte[]{49}, new byte[]{50});
|
||||
assertThat(receivedMessages).extracting("payload").containsExactlyInAnyOrder(0, 1, 2);
|
||||
Condition<Message<?>> payloadIs2 = new Condition<Message<?>>() {
|
||||
|
||||
@Override
|
||||
public boolean matches(Message<?> value) {
|
||||
try {
|
||||
return om.readValue((byte[]) value.getPayload(), Integer.class).equals(2);
|
||||
}
|
||||
catch (IOException e) {
|
||||
//
|
||||
}
|
||||
return false;
|
||||
return value.getPayload().equals(2);
|
||||
}
|
||||
};
|
||||
assertThat(receivedMessages).filteredOn(payloadIs2).areExactly(1, correlationHeadersForPayload2);
|
||||
@@ -1238,12 +1040,11 @@ public class KafkaBinderTests extends
|
||||
assertThat(receive2).isNotNull();
|
||||
Message<?> receive3 = receive(input3);
|
||||
assertThat(receive3).isNotNull();
|
||||
ObjectMapper om = new ObjectMapper();
|
||||
|
||||
assertThat(om.readValue((byte[])receive0.getPayload(), Integer.class)).isEqualTo(0);
|
||||
assertThat(om.readValue((byte[])receive1.getPayload(), Integer.class)).isEqualTo(1);
|
||||
assertThat(om.readValue((byte[])receive2.getPayload(), Integer.class)).isEqualTo(2);
|
||||
assertThat(om.readValue((byte[])receive3.getPayload(), Integer.class)).isEqualTo(3);
|
||||
assertThat(receive0.getPayload()).isEqualTo(0);
|
||||
assertThat(receive1.getPayload()).isEqualTo(1);
|
||||
assertThat(receive2.getPayload()).isEqualTo(2);
|
||||
assertThat(receive3.getPayload()).isEqualTo(3);
|
||||
|
||||
input0Binding.unbind();
|
||||
input1Binding.unbind();
|
||||
@@ -1613,7 +1414,7 @@ public class KafkaBinderTests extends
|
||||
Binding<?> producerBinding = null;
|
||||
Binding<?> consumerBinding = null;
|
||||
try {
|
||||
Integer testPayload = 10;
|
||||
Integer testPayload = new Integer(10);
|
||||
Message<?> message = MessageBuilder.withPayload(testPayload).build();
|
||||
SubscribableChannel moduleOutputChannel = new DirectChannel();
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
@@ -1655,70 +1456,14 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
}
|
||||
|
||||
//TODO: We need to evaluate this test as built in serialization has different meaning in 2.0
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testNativeSerializationWithCustomSerializerDeserializerBytesPayload() throws Exception {
|
||||
Binding<?> producerBinding = null;
|
||||
Binding<?> consumerBinding = null;
|
||||
try {
|
||||
byte[] testPayload = new byte[1];
|
||||
Message<?> message = MessageBuilder.withPayload(testPayload)
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, "something/funky")
|
||||
.build();
|
||||
SubscribableChannel moduleOutputChannel = new DirectChannel();
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
final ZkClient zkClient;
|
||||
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
|
||||
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
|
||||
invokeCreateTopic(zkUtils, testTopicName, 1, 1, new Properties());
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.setUseNativeEncoding(true);
|
||||
producerProperties.getExtension()
|
||||
.getConfiguration()
|
||||
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
|
||||
producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
consumerProperties.getExtension()
|
||||
.getConfiguration()
|
||||
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel, 500);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(inbound.getPayload()).isEqualTo(new byte[1]);
|
||||
assertThat(inbound.getHeaders()).containsKey("contentType");
|
||||
assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString()).isEqualTo("something/funky");
|
||||
}
|
||||
finally {
|
||||
if (producerBinding != null) {
|
||||
producerBinding.unbind();
|
||||
}
|
||||
if (consumerBinding != null) {
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testBuiltinSerialization() throws Exception {
|
||||
Binding<?> producerBinding = null;
|
||||
Binding<?> consumerBinding = null;
|
||||
try {
|
||||
String testPayload = "test";
|
||||
Message<?> message = MessageBuilder.withPayload(testPayload.getBytes())
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE.getBytes())
|
||||
.build();
|
||||
String testPayload = new String("test");
|
||||
Message<?> message = MessageBuilder.withPayload(testPayload).build();
|
||||
SubscribableChannel moduleOutputChannel = new DirectChannel();
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
@@ -1741,8 +1486,8 @@ public class KafkaBinderTests extends
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel, 5);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(inbound.getPayload()).isEqualTo("test".getBytes());
|
||||
assertThat(inbound.getHeaders()).containsEntry("contentType", "text/plain".getBytes());
|
||||
assertThat(inbound.getPayload()).isEqualTo("test");
|
||||
assertThat(inbound.getHeaders()).containsEntry("contentType", "text/plain");
|
||||
}
|
||||
finally {
|
||||
if (producerBinding != null) {
|
||||
@@ -1894,6 +1639,113 @@ public class KafkaBinderTests extends
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSendAndReceiveWithRawModeAndStringPayload() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
DirectChannel moduleOutputChannel = new DirectChannel();
|
||||
QueueChannel moduleInputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.setHeaderMode(HeaderMode.raw);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("raw.string.0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setHeaderMode(HeaderMode.raw);
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("raw.string.0", "test", moduleInputChannel,
|
||||
consumerProperties);
|
||||
Message<?> message = org.springframework.integration.support.MessageBuilder
|
||||
.withPayload("testSendAndReceiveWithRawModeAndStringPayload").build();
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(moduleInputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(new String((byte[]) inbound.getPayload()))
|
||||
.isEqualTo("testSendAndReceiveWithRawModeAndStringPayload");
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSendAndReceiveWithExplicitConsumerGroupWithRawMode() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
DirectChannel moduleOutputChannel = new DirectChannel();
|
||||
// Test pub/sub by emulating how StreamPlugin handles taps
|
||||
QueueChannel module1InputChannel = new QueueChannel();
|
||||
QueueChannel module2InputChannel = new QueueChannel();
|
||||
QueueChannel module3InputChannel = new QueueChannel();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.setHeaderMode(HeaderMode.raw);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("baz.raw.0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setHeaderMode(HeaderMode.raw);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
Binding<MessageChannel> input1Binding = binder.bindConsumer("baz.raw.0", "test", module1InputChannel,
|
||||
consumerProperties);
|
||||
// A new module is using the tap as an input channel
|
||||
String fooTapName = "baz.raw.0";
|
||||
Binding<MessageChannel> input2Binding = binder.bindConsumer(fooTapName, "tap1", module2InputChannel,
|
||||
consumerProperties);
|
||||
// Another new module is using tap as an input channel
|
||||
String barTapName = "baz.raw.0";
|
||||
Binding<MessageChannel> input3Binding = binder.bindConsumer(barTapName, "tap2", module3InputChannel,
|
||||
consumerProperties);
|
||||
|
||||
Message<?> message = org.springframework.integration.support.MessageBuilder
|
||||
.withPayload("testSendAndReceiveWithExplicitConsumerGroupWithRawMode".getBytes()).build();
|
||||
boolean success = false;
|
||||
boolean retried = false;
|
||||
while (!success) {
|
||||
moduleOutputChannel.send(message);
|
||||
Message<?> inbound = receive(module1InputChannel);
|
||||
assertThat(inbound).isNotNull();
|
||||
assertThat(new String((byte[]) inbound.getPayload()))
|
||||
.isEqualTo("testSendAndReceiveWithExplicitConsumerGroupWithRawMode");
|
||||
|
||||
Message<?> tapped1 = receive(module2InputChannel);
|
||||
Message<?> tapped2 = receive(module3InputChannel);
|
||||
if (tapped1 == null || tapped2 == null) {
|
||||
// listener may not have started
|
||||
assertThat(retried).isFalse().withFailMessage("Failed to receive tap after retry");
|
||||
retried = true;
|
||||
continue;
|
||||
}
|
||||
success = true;
|
||||
assertThat(new String((byte[]) tapped1.getPayload()))
|
||||
.isEqualTo("testSendAndReceiveWithExplicitConsumerGroupWithRawMode");
|
||||
assertThat(new String((byte[]) tapped2.getPayload()))
|
||||
.isEqualTo("testSendAndReceiveWithExplicitConsumerGroupWithRawMode");
|
||||
}
|
||||
// delete one tap stream is deleted
|
||||
input3Binding.unbind();
|
||||
Message<?> message2 = org.springframework.integration.support.MessageBuilder.withPayload("bar".getBytes())
|
||||
.build();
|
||||
moduleOutputChannel.send(message2);
|
||||
|
||||
// other tap still receives messages
|
||||
Message<?> tapped = receive(module2InputChannel);
|
||||
assertThat(tapped).isNotNull();
|
||||
|
||||
// removed tap does not
|
||||
assertThat(receive(module3InputChannel)).isNull();
|
||||
|
||||
// re-subscribed tap does receive the message
|
||||
input3Binding = binder.bindConsumer(barTapName, "tap2", module3InputChannel, createConsumerProperties());
|
||||
assertThat(receive(module3InputChannel)).isNotNull();
|
||||
|
||||
// clean up
|
||||
input1Binding.unbind();
|
||||
input2Binding.unbind();
|
||||
input3Binding.unbind();
|
||||
producerBinding.unbind();
|
||||
assertThat(extractEndpoint(input1Binding).isRunning()).isFalse();
|
||||
assertThat(extractEndpoint(input2Binding).isRunning()).isFalse();
|
||||
assertThat(extractEndpoint(input3Binding).isRunning()).isFalse();
|
||||
assertThat(extractEndpoint(producerBinding).isRunning()).isFalse();
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Test
|
||||
public void testProducerErrorChannel() throws Exception {
|
||||
@@ -1904,18 +1756,30 @@ public class KafkaBinderTests extends
|
||||
producerProps.setHeaderMode(HeaderMode.raw);
|
||||
producerProps.setErrorChannelEnabled(true);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0", moduleOutputChannel, producerProps);
|
||||
final Message<?> message = MessageBuilder.withPayload("bad").setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
|
||||
final Message<?> message = MessageBuilder.withPayload("bad").setHeader(MessageHeaders.CONTENT_TYPE, "foo/bar")
|
||||
.build();
|
||||
SubscribableChannel ec = binder.getApplicationContext().getBean("ec.0.errors", SubscribableChannel.class);
|
||||
final AtomicReference<Message<?>> errorMessage = new AtomicReference<>();
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
ec.subscribe(message1 -> {
|
||||
errorMessage.set(message1);
|
||||
latch.countDown();
|
||||
ec.subscribe(new MessageHandler() {
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
errorMessage.set(message);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
});
|
||||
SubscribableChannel globalEc = binder.getApplicationContext()
|
||||
.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, SubscribableChannel.class);
|
||||
globalEc.subscribe(message12 -> latch.countDown());
|
||||
globalEc.subscribe(new MessageHandler() {
|
||||
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
});
|
||||
KafkaProducerMessageHandler endpoint = TestUtils.getPropertyValue(producerBinding, "lifecycle",
|
||||
KafkaProducerMessageHandler.class);
|
||||
final RuntimeException fooException = new RuntimeException("foo");
|
||||
@@ -1932,7 +1796,7 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
|
||||
@Override // SIK 2.3+
|
||||
public ListenableFuture send(ProducerRecord record) {
|
||||
public ListenableFuture<SendResult> send(ProducerRecord record) {
|
||||
sent.set(record.value());
|
||||
SettableListenableFuture<SendResult> future = new SettableListenableFuture<>();
|
||||
future.setException(fooException);
|
||||
@@ -1947,12 +1811,16 @@ public class KafkaBinderTests extends
|
||||
assertThat(errorMessage.get().getPayload()).isInstanceOf(KafkaSendFailureException.class);
|
||||
KafkaSendFailureException exception = (KafkaSendFailureException) errorMessage.get().getPayload();
|
||||
assertThat(exception.getCause()).isSameAs(fooException);
|
||||
ObjectMapper om = new ObjectMapper();
|
||||
assertThat(om.readValue((byte[] )exception.getFailedMessage().getPayload(), String.class)).isEqualTo(message.getPayload());
|
||||
assertThat(new String(((byte[] )exception.getFailedMessage().getPayload()))).isEqualTo(message.getPayload());
|
||||
assertThat(exception.getRecord().value()).isSameAs(sent.get());
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
Thread.sleep(500);
|
||||
}
|
||||
|
||||
private final class FailingInvocationCountingMessageHandler implements MessageHandler {
|
||||
|
||||
private int invocationCount;
|
||||
@@ -13,7 +13,6 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.bootstrap;
|
||||
|
||||
import org.junit.ClassRule;
|
||||
@@ -0,0 +1,8 @@
|
||||
{"namespace": "org.springframework.cloud.stream.binder.kafka",
|
||||
"type": "record",
|
||||
"name": "User1",
|
||||
"fields": [
|
||||
{"name": "name", "type": "string"},
|
||||
{"name": "favoriteColor", "type": "string"}
|
||||
]
|
||||
}
|
||||