Ability to override deserializer in Kafka consumer

- When binding the consumer, the kafka consumer should not be set to use `ByteArrayDeserializer` for both key/value deserializer. Instead, they need to be used as the default values. Any extended consumer properties for key/value deserializer should override this default deserializer.

 - Add test

This resolves #55

Add tests for custom/native serialization

 - Test using built-in serialization without using kafka native serialization (ie. both the serializer/de-serializer are set to use ByteArraySe/Deserializer)
 - Test using custom serializer by explicitly setting value.deserializer for both Kafka producer/consumer properties
 - Test avro message conversion and Kafka Avro Serializer using Confluent Schema Registry
     - Given pre-released registry versions have some bugs that blocks the testing and `3.0.1` requires Kafka 0.10, this test is only in Kafka .10 binder

Update Schema based custom serializer/de-serializer tests

Fix import

Handle unbind during tests appropriately
This commit is contained in:
Ilayaperumal Gopinathan
2016-10-11 13:17:28 +05:30
committed by Marius Bogoevici
parent 943a7ae148
commit 36c90aa8c0
7 changed files with 367 additions and 19 deletions

1
.gitignore vendored
View File

@@ -18,6 +18,7 @@ _site/
*.ipr
*.iws
.idea/*
*/.idea
.factorypath
dump.rdb
.apt_generated

View File

@@ -73,7 +73,32 @@
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.1.1.BUILD-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>3.0.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>

View File

@@ -21,27 +21,45 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.eclipse.jetty.server.Server;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RetryOperations;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertTrue;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
@@ -164,4 +182,64 @@ public class Kafka10BinderTests extends KafkaBinderTests {
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
}
@Test
@SuppressWarnings("unchecked")
public void testCustomAvroSerialization() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
Map<String, Object> schemaRegistryProps = new HashMap<>();
schemaRegistryProps.put("kafkastore.connection.url", configurationProperties.getZkConnectionString());
schemaRegistryProps.put("listeners", "http://0.0.0.0:8082");
schemaRegistryProps.put("port", "8082");
schemaRegistryProps.put("kafkastore.topic", "_schemas");
SchemaRegistryConfig config = new SchemaRegistryConfig(schemaRegistryProps);
SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
Server server = app.createServer();
server.start();
long endTime = System.currentTimeMillis() + 5000;
while(true) {
if (server.isRunning()) {
break;
}
else if (System.currentTimeMillis() > endTime) {
fail("Kafka Schema Registry Server failed to start");
}
}
User1 firstOutboundFoo = new User1();
String userName1 = "foo-name" + UUID.randomUUID().toString();
String favColor1 = "foo-color" + UUID.randomUUID().toString();
firstOutboundFoo.setName(userName1);
firstOutboundFoo.setFavoriteColor(favColor1);
Message<?> message = MessageBuilder.withPayload(firstOutboundFoo).build();
SubscribableChannel moduleOutputChannel = new DirectChannel();
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.getExtension().getConfiguration().put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
producerProperties.setUseNativeEncoding(true);
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
consumerProperties.getExtension().getConfiguration().put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
consumerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
Binding<MessageChannel> consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
assertThat(inbound).isNotNull();
assertTrue(message.getPayload() instanceof User1);
User1 receivedUser = (User1) message.getPayload();
assertThat(receivedUser.getName()).isEqualTo(userName1);
assertThat(receivedUser.getFavoriteColor()).isEqualTo(favColor1);
producerBinding.unbind();
consumerBinding.unbind();
}
}

View File

@@ -0,0 +1,85 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.specific.SpecificRecordBase;
import org.springframework.core.io.ClassPathResource;
/**
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
*/
public class User1 extends SpecificRecordBase {
@Nullable
private String name;
@Nullable
private String favoriteColor;
public String getName() {
return this.name;
}
public void setName(String name) {
this.name = name;
}
public String getFavoriteColor() {
return this.favoriteColor;
}
public void setFavoriteColor(String favoriteColor) {
this.favoriteColor = favoriteColor;
}
@Override
public Schema getSchema() {
try {
return new Schema.Parser().parse(new ClassPathResource("schemas/users_v1.schema").getInputStream());
}
catch (IOException e) {
throw new IllegalStateException(e);
}
}
@Override
public Object get(int i) {
if (i == 0) {
return getName().toString();
}
if (i == 1) {
return getFavoriteColor().toString();
}
return null;
}
@Override
public void put(int i, Object o) {
if (i == 0) {
setName((String) o);
}
if (i == 1) {
setFavoriteColor((String) o);
}
}
}

View File

@@ -0,0 +1,8 @@
{"namespace": "org.springframework.cloud.stream.binder.kafka",
"type": "record",
"name": "User1",
"fields": [
{"name": "name", "type": "string"},
{"name": "favoriteColor", "type": "string"}
]
}

View File

@@ -38,7 +38,6 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;
import org.springframework.beans.factory.DisposableBean;
@@ -258,7 +257,6 @@ public class KafkaMessageChannelBinder extends
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
producerProperties.getExtension().getCompressionType().toString());
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
}
@@ -303,29 +301,20 @@ public class KafkaMessageChannelBinder extends
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
Map<String, Object> props = getConsumerConfig(anonymous, consumerGroup);
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
if (!ObjectUtils.isEmpty(properties.getExtension().getConfiguration())) {
props.putAll(properties.getExtension().getConfiguration());
}
ConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(props, keyDecoder,
valueDecoder);
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
Collection<PartitionInfo> listenedPartitions = destination;
Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
listenedPartitions);
final ContainerProperties containerProperties =
anonymous || properties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(name)
: new ContainerProperties(topicPartitionInitialOffsets);
int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
final ConcurrentMessageListenerContainer<byte[], byte[]> messageListenerContainer =
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
new ConcurrentMessageListenerContainer(
consumerFactory, containerProperties) {
@@ -339,25 +328,20 @@ public class KafkaMessageChannelBinder extends
if (!properties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
}
final KafkaMessageDrivenChannelAdapter<byte[], byte[]> kafkaMessageDrivenChannelAdapter =
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(
messageListenerContainer);
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);
if (properties.getExtension().isEnableDlq()) {
final String dlqTopic = "error." + name + "." + group;
initDlqProducer();
@@ -400,6 +384,8 @@ public class KafkaMessageChannelBinder extends
private Map<String, Object> getConsumerConfig(boolean anonymous, String consumerGroup) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -27,6 +28,10 @@ import java.util.concurrent.TimeUnit;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.Condition;
import org.junit.Ignore;
import org.junit.Test;
@@ -35,6 +40,7 @@ import org.springframework.beans.DirectFieldAccessor;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.PartitionCapableBinderTests;
@@ -46,7 +52,10 @@ import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
@@ -54,6 +63,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.RetryOperations;
@@ -63,9 +73,11 @@ import org.springframework.retry.support.RetryTemplate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertTrue;
/**
* @author Soby Chacko
* @author Ilayaperumal Gopinathan
*/
public abstract class KafkaBinderTests extends PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>> {
@@ -1185,6 +1197,159 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
assertThat(partitionSize(testTopicName)).isEqualTo(6);
}
@Test
@SuppressWarnings("unchecked")
public void testConsumerDefaultDeserializer() throws Exception {
Binding<?> binding = null;
try {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkUtils zkUtils = getZkUtils(configurationProperties);
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(zkUtils, testTopicName, 5, 1, new Properties());
configurationProperties.setAutoCreateTopics(false);
Binder binder = getBinder(configurationProperties);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding));
assertTrue(consumerAccessor.getPropertyValue("keyDeserializer") instanceof ByteArrayDeserializer);
assertTrue(consumerAccessor.getPropertyValue("valueDeserializer") instanceof ByteArrayDeserializer);
}
finally {
if (binding != null) {
binding.unbind();
}
}
}
@Test
@SuppressWarnings("unchecked")
public void testConsumerCustomDeserializer() throws Exception {
Binding<?> binding = null;
try {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
Map<String, String> propertiesToOverride = configurationProperties.getConfiguration();
propertiesToOverride.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
propertiesToOverride.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
configurationProperties.setConfiguration(propertiesToOverride);
final ZkUtils zkUtils = getZkUtils(configurationProperties);
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(zkUtils, testTopicName, 5, 1, new Properties());
configurationProperties.setAutoCreateTopics(false);
Binder binder = getBinder(configurationProperties);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
binding = binder.bindConsumer(testTopicName, "test", output, consumerProperties);
DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding));
assertTrue("Expected StringDeserializer as a custom key deserializer", consumerAccessor.getPropertyValue("keyDeserializer") instanceof StringDeserializer);
assertTrue("Expected LongDeserializer as a custom value deserializer", consumerAccessor.getPropertyValue("valueDeserializer") instanceof LongDeserializer);
}
finally {
if (binding != null) {
binding.unbind();
}
}
}
private KafkaConsumer getKafkaConsumer(Binding binding) {
DirectFieldAccessor bindingAccessor = new DirectFieldAccessor((DefaultBinding)binding);
KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) bindingAccessor.getPropertyValue("endpoint");
DirectFieldAccessor adapterAccessor = new DirectFieldAccessor(adapter);
ConcurrentMessageListenerContainer messageListenerContainer = (ConcurrentMessageListenerContainer) adapterAccessor.getPropertyValue("messageListenerContainer");
DirectFieldAccessor containerAccessor = new DirectFieldAccessor((ConcurrentMessageListenerContainer)messageListenerContainer);
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) containerAccessor.getPropertyValue("consumerFactory");
return (KafkaConsumer) consumerFactory.createConsumer();
}
@Test
@SuppressWarnings("unchecked")
public void testNativeSerializationWithCustomSerializerDeserializer() throws Exception {
Binding<?> producerBinding = null;
Binding<?> consumerBinding = null;
try {
Integer testPayload = new Integer(10);
Message<?> message = MessageBuilder.withPayload(testPayload).build();
SubscribableChannel moduleOutputChannel = new DirectChannel();
String testTopicName = "existing" + System.currentTimeMillis();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient;
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setUseNativeEncoding(true);
producerProperties.getExtension().getConfiguration().put("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
consumerProperties.getExtension().getConfiguration().put("value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel, 500);
assertThat(inbound).isNotNull();
assertThat(inbound.getPayload()).isEqualTo(10);
assertThat(inbound.getHeaders()).doesNotContainKey("contentType");
}
finally {
if (producerBinding != null) {
producerBinding.unbind();
}
if (consumerBinding != null) {
consumerBinding.unbind();
}
}
}
@Test
@SuppressWarnings("unchecked")
public void testBuiltinSerialization() throws Exception {
Binding<?> producerBinding = null;
Binding<?> consumerBinding = null;
try {
String testPayload = new String("test");
Message<?> message = MessageBuilder.withPayload(testPayload).build();
SubscribableChannel moduleOutputChannel = new DirectChannel();
String testTopicName = "existing" + System.currentTimeMillis();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient;
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel, 5);
assertThat(inbound).isNotNull();
assertThat(inbound.getPayload()).isEqualTo("test");
assertThat(inbound.getHeaders()).containsEntry("contentType", "text/plain");
}
finally {
if (producerBinding != null) {
producerBinding.unbind();
}
if (consumerBinding != null) {
consumerBinding.unbind();
}
}
}
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
Thread.sleep(500);