Changes for intro of AbstractMessageChannelBinder

This commit is contained in:
Marius Bogoevici
2016-06-22 20:00:46 -04:00
committed by Mark Fisher
parent 487212c151
commit 9cf07a3df4
5 changed files with 379 additions and 488 deletions

13
pom.xml
View File

@@ -29,6 +29,19 @@
<module>spring-cloud-starter-stream-kafka</module>
<module>spring-cloud-stream-binder-kafka-test-support</module>
</modules>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>spring</id>

View File

@@ -13,19 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.test.junit.kafka;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Rule;
import org.springframework.cloud.stream.test.junit.AbstractExternalResourceTestSupport;
import org.springframework.util.SocketUtils;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
@@ -33,6 +26,14 @@ import kafka.utils.TestUtils;
import kafka.utils.Utils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Rule;
import org.springframework.cloud.stream.test.junit.AbstractExternalResourceTestSupport;
import org.springframework.util.SocketUtils;
/**
@@ -200,4 +201,3 @@ public class KafkaTestSupport extends AbstractExternalResourceTestSupport<String
}
}

View File

@@ -22,21 +22,17 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.admin.AdminUtils;
import kafka.api.OffsetRequest;
import kafka.api.TopicMetadata;
import kafka.common.ErrorMapping;
import kafka.serializer.Decoder;
import kafka.serializer.DefaultDecoder;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
@@ -51,25 +47,16 @@ import org.apache.kafka.common.utils.Utils;
import scala.collection.Seq;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.stream.binder.AbstractBinder;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.http.MediaType;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.context.Lifecycle;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.DefaultConnectionFactory;
import org.springframework.integration.kafka.core.KafkaMessage;
@@ -83,7 +70,7 @@ import org.springframework.integration.kafka.listener.KafkaMessageListenerContai
import org.springframework.integration.kafka.listener.KafkaNativeOffsetManager;
import org.springframework.integration.kafka.listener.MessageListener;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.integration.kafka.support.KafkaHeaders;
import org.springframework.integration.kafka.support.KafkaProducerContext;
import org.springframework.integration.kafka.support.ProducerConfiguration;
import org.springframework.integration.kafka.support.ProducerFactoryBean;
import org.springframework.integration.kafka.support.ProducerListener;
@@ -92,9 +79,7 @@ import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.MessagingException;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
@@ -109,7 +94,6 @@ import org.springframework.util.StringUtils;
/**
* A {@link Binder} that uses Kafka as the underlying middleware.
*
* @author Eric Bottard
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
@@ -119,13 +103,14 @@ import org.springframework.util.StringUtils;
* @author Soby Chacko
*/
public class KafkaMessageChannelBinder extends
AbstractBinder<MessageChannel, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>>
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>, Collection<Partition>>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties>,
DisposableBean {
public static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
public static final ThreadFactory DAEMON_THREAD_FACTORY;
private static final ThreadFactory DAEMON_THREAD_FACTORY;
static {
CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("kafka-binder-");
@@ -135,8 +120,6 @@ public class KafkaMessageChannelBinder extends
private final KafkaBinderConfigurationProperties configurationProperties;
private final String[] headersToMap;
private RetryOperations metadataRetryOperations;
private final Map<String, Collection<Partition>> topicsInUse = new HashMap<>();
@@ -152,26 +135,28 @@ public class KafkaMessageChannelBinder extends
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties) {
super(false, headersToMap(configurationProperties));
this.configurationProperties = configurationProperties;
String[] configuredHeaders = configurationProperties.getHeaders();
if (ObjectUtils.isEmpty(configuredHeaders)) {
this.headersToMap = BinderHeaders.STANDARD_HEADERS;
}
private static String[] headersToMap(KafkaBinderConfigurationProperties configurationProperties) {
String[] headersToMap;
if (ObjectUtils.isEmpty(configurationProperties.getHeaders())) {
headersToMap = BinderHeaders.STANDARD_HEADERS;
}
else {
String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0,
BinderHeaders.STANDARD_HEADERS.length + configuredHeaders.length);
System.arraycopy(configuredHeaders, 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length,
configuredHeaders.length);
this.headersToMap = combinedHeadersToMap;
BinderHeaders.STANDARD_HEADERS.length + configurationProperties.getHeaders().length);
System.arraycopy(configurationProperties.getHeaders(), 0, combinedHeadersToMap,
BinderHeaders.STANDARD_HEADERS.length,
configurationProperties.getHeaders().length);
headersToMap = combinedHeadersToMap;
}
return headersToMap;
}
String getZkAddress() {
return this.configurationProperties.getZkConnectionString();
}
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
ConnectionFactory getConnectionFactory() {
return this.connectionFactory;
}
public void setProducerListener(ProducerListener producerListener) {
@@ -193,13 +178,13 @@ public class KafkaMessageChannelBinder extends
@Override
public void onInit() throws Exception {
ZookeeperConfiguration configuration = new ZookeeperConfiguration(
new ZookeeperConnect(configurationProperties.getZkConnectionString()));
configuration.setBufferSize(configurationProperties.getSocketBufferSize());
configuration.setMaxWait(configurationProperties.getMaxWait());
new ZookeeperConnect(this.configurationProperties.getZkConnectionString()));
configuration.setBufferSize(this.configurationProperties.getSocketBufferSize());
configuration.setMaxWait(this.configurationProperties.getMaxWait());
DefaultConnectionFactory defaultConnectionFactory = new DefaultConnectionFactory(configuration);
defaultConnectionFactory.afterPropertiesSet();
this.connectionFactory = defaultConnectionFactory;
if (metadataRetryOperations == null) {
if (this.metadataRetryOperations == null) {
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
@@ -211,22 +196,22 @@ public class KafkaMessageChannelBinder extends
backOffPolicy.setMultiplier(2);
backOffPolicy.setMaxInterval(1000);
retryTemplate.setBackOffPolicy(backOffPolicy);
metadataRetryOperations = retryTemplate;
this.metadataRetryOperations = retryTemplate;
}
}
@Override
public void destroy() throws Exception {
if (dlqProducer != null) {
dlqProducer.close();
dlqProducer = null;
if (this.dlqProducer != null) {
this.dlqProducer.close();
this.dlqProducer = null;
}
}
/**
* Allowed chars are ASCII alphanumerics, '.', '_' and '-'.
*/
public static void validateTopicName(String topicName) {
static void validateTopicName(String topicName) {
try {
byte[] utf8 = topicName.getBytes("UTF-8");
for (byte b : utf8) {
@@ -244,12 +229,12 @@ public class KafkaMessageChannelBinder extends
@Override
public KafkaConsumerProperties getExtendedConsumerProperties(String channelName) {
return extendedBindingProperties.getExtendedConsumerProperties(channelName);
return this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
}
@Override
public KafkaProducerProperties getExtendedProducerProperties(String channelName) {
return extendedBindingProperties.getExtendedProducerProperties(channelName);
return this.extendedBindingProperties.getExtendedProducerProperties(channelName);
}
Map<String, Collection<Partition>> getTopicsInUse() {
@@ -257,183 +242,8 @@ public class KafkaMessageChannelBinder extends
}
@Override
protected Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel,
protected Collection<Partition> createConsumerDestinationIfNecessary(String name, String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
// If the caller provides a consumer group, use it; otherwise an anonymous
// consumer group
// is generated each time, such that each anonymous binding will receive all
// messages.
// Consumers reset offsets at the latest time by default, which allows them to
// receive only
// messages sent after they've been bound. That behavior can be changed with the
// "resetOffsets" and "startOffset" properties.
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
// The reference point, if not set explicitly is the latest time for anonymous
// subscriptions and the
// earliest time for group subscriptions. This allows the latter to receive
// messages published before the group
// has been created.
long referencePoint = properties.getExtension().getStartOffset() != null
? properties.getExtension().getStartOffset().getReferencePoint()
: (anonymous ? OffsetRequest.LatestTime() : OffsetRequest.EarliestTime());
return createKafkaConsumer(name, inputChannel, properties, consumerGroup, referencePoint);
}
@Override
public Binding<MessageChannel> doBindProducer(String name, MessageChannel moduleOutputChannel,
ExtendedProducerProperties<KafkaProducerProperties> properties) {
Assert.isInstanceOf(SubscribableChannel.class, moduleOutputChannel);
if (logger.isInfoEnabled()) {
logger.info("Using kafka topic for outbound: " + name);
}
validateTopicName(name);
Collection<Partition> partitions = ensureTopicCreated(name, properties.getPartitionCount());
if (properties.getPartitionCount() < partitions.size()) {
if (logger.isInfoEnabled()) {
logger.info("The `partitionCount` of the producer for topic " + name + " is "
+ properties.getPartitionCount() + ", smaller than the actual partition count of "
+ partitions.size() + " of the topic. The larger number will be used instead.");
}
}
topicsInUse.put(name, partitions);
ProducerMetadata<byte[], byte[]> producerMetadata = new ProducerMetadata<>(name, byte[].class, byte[].class,
BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
producerMetadata.setSync(properties.getExtension().isSync());
producerMetadata.setCompressionType(properties.getExtension().getCompressionType());
producerMetadata.setBatchBytes(properties.getExtension().getBufferSize());
Properties additionalProps = new Properties();
additionalProps.put(ProducerConfig.ACKS_CONFIG, String.valueOf(configurationProperties.getRequiredAcks()));
additionalProps.put(ProducerConfig.LINGER_MS_CONFIG,
String.valueOf(properties.getExtension().getBatchTimeout()));
ProducerFactoryBean<byte[], byte[]> producerFB = new ProducerFactoryBean<>(producerMetadata,
configurationProperties.getKafkaConnectionString(), additionalProps);
try {
final ProducerConfiguration<byte[], byte[]> producerConfiguration = new ProducerConfiguration<>(
producerMetadata, producerFB.getObject());
producerConfiguration.setProducerListener(producerListener);
MessageHandler handler = new SendingHandler(name, properties, partitions.size(), producerConfiguration);
EventDrivenConsumer consumer = new EventDrivenConsumer((SubscribableChannel) moduleOutputChannel, handler) {
@Override
protected void doStop() {
super.doStop();
producerConfiguration.stop();
}
};
consumer.setBeanFactory(this.getBeanFactory());
consumer.setBeanName("outbound." + name);
consumer.afterPropertiesSet();
DefaultBinding<MessageChannel> producerBinding = new DefaultBinding<>(name, null, moduleOutputChannel,
consumer);
consumer.start();
return producerBinding;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Creates a Kafka topic if needed, or try to increase its partition count to the
* desired number.
*/
private Collection<Partition> ensureTopicCreated(final String topicName, final int partitionCount) {
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
try {
final Properties topicConfig = new Properties();
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient);
if (topicMetadata.errorCode() == ErrorMapping.NoError()) {
// only consider minPartitionCount for resizing if autoAddPartitions is
// true
int effectivePartitionCount = configurationProperties.isAutoAddPartitions()
? Math.max(configurationProperties.getMinPartitionCount(), partitionCount) : partitionCount;
if (topicMetadata.partitionsMetadata().size() < effectivePartitionCount) {
if (configurationProperties.isAutoAddPartitions()) {
AdminUtils.addPartitions(zkClient, topicName, effectivePartitionCount, null, false,
new Properties());
}
else {
int topicSize = topicMetadata.partitionsMetadata().size();
throw new BinderException("The number of expected partitions was: " + partitionCount + ", but "
+ topicSize + (topicSize > 1 ? " have " : " has ") + "been found instead."
+ "Consider either increasing the partition count of the topic or enabling `autoAddPartitions`");
}
}
}
else if (topicMetadata.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) {
if (configurationProperties.isAutoCreateTopics()) {
Seq<Object> brokerList = ZkUtils.getSortedBrokerList(zkClient);
// always consider minPartitionCount for topic creation
int effectivePartitionCount = Math.max(configurationProperties.getMinPartitionCount(),
partitionCount);
final scala.collection.Map<Object, Seq<Object>> replicaAssignment = AdminUtils
.assignReplicasToBrokers(brokerList, effectivePartitionCount,
configurationProperties.getReplicationFactor(), -1, -1);
metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
@Override
public Object doWithRetry(RetryContext context) throws RuntimeException {
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topicName,
replicaAssignment, topicConfig, true);
return null;
}
});
}
else {
throw new BinderException("Topic " + topicName + " does not exist");
}
}
else {
throw new BinderException("Error fetching Kafka topic metadata: ",
ErrorMapping.exceptionFor(topicMetadata.errorCode()));
}
try {
Collection<Partition> partitions = metadataRetryOperations
.execute(new RetryCallback<Collection<Partition>, Exception>() {
@Override
public Collection<Partition> doWithRetry(RetryContext context) throws Exception {
connectionFactory.refreshMetadata(Collections.singleton(topicName));
Collection<Partition> partitions = connectionFactory.getPartitions(topicName);
// do a sanity check on the partition set
if (partitions.size() < partitionCount) {
throw new IllegalStateException("The number of expected partitions was: "
+ partitionCount + ", but " + partitions.size()
+ (partitions.size() > 1 ? " have " : " has ") + "been found instead");
}
connectionFactory.getLeaders(partitions);
return partitions;
}
});
return partitions;
}
catch (Exception e) {
logger.error("Cannot initialize Binder", e);
throw new BinderException("Cannot initialize binder:", e);
}
}
finally {
zkClient.close();
}
}
private Binding<MessageChannel> createKafkaConsumer(String name, final MessageChannel moduleInputChannel,
ExtendedConsumerProperties<KafkaConsumerProperties> properties, String group, long referencePoint) {
validateTopicName(name);
if (properties.getInstanceCount() == 0) {
throw new IllegalArgumentException("Instance count cannot be zero");
@@ -441,9 +251,6 @@ public class KafkaMessageChannelBinder extends
Collection<Partition> allPartitions = ensureTopicCreated(name,
properties.getInstanceCount() * properties.getConcurrency());
Decoder<byte[]> valueDecoder = new DefaultDecoder(null);
Decoder<byte[]> keyDecoder = new DefaultDecoder(null);
Collection<Partition> listenedPartitions;
if (properties.getInstanceCount() == 1) {
@@ -458,59 +265,88 @@ public class KafkaMessageChannelBinder extends
}
}
}
topicsInUse.put(name, listenedPartitions);
ReceivingHandler rh = new ReceivingHandler(properties);
rh.setOutputChannel(moduleInputChannel);
this.topicsInUse.put(name, listenedPartitions);
return listenedPartitions;
}
final FixedSubscriberChannel bridge = new FixedSubscriberChannel(rh);
bridge.setBeanName("bridge." + name);
Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
@Override
@SuppressWarnings("unchecked")
protected MessageProducer createConsumerEndpoint(String name, String group, Collection<Partition> destination,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
Assert.isTrue(!CollectionUtils.isEmpty(destination), "A list of partitions must be provided");
int concurrency = Math.min(properties.getConcurrency(), destination.size());
final ExecutorService dispatcherTaskExecutor =
Executors.newFixedThreadPool(concurrency, DAEMON_THREAD_FACTORY);
final KafkaMessageListenerContainer messageListenerContainer = new KafkaMessageListenerContainer(
connectionFactory, listenedPartitions.toArray(new Partition[listenedPartitions.size()]));
this.connectionFactory, destination.toArray(new Partition[destination.size()])) {
if (logger.isDebugEnabled()) {
logger.debug("Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions));
@Override
public void stop(Runnable callback) {
super.stop(callback);
if (getOffsetManager() instanceof DisposableBean) {
try {
((DisposableBean) getOffsetManager()).destroy();
}
catch (Exception e) {
KafkaMessageChannelBinder.this.logger.error("Error while closing the offset manager", e);
}
}
dispatcherTaskExecutor.shutdown();
}
};
if (this.logger.isDebugEnabled()) {
this.logger.debug(
"Listened partitions: " + StringUtils.collectionToCommaDelimitedString(destination));
}
OffsetManager offsetManager = createOffsetManager(group, referencePoint);
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
long referencePoint = properties.getExtension().getStartOffset() != null
? properties.getExtension().getStartOffset().getReferencePoint()
: (anonymous ? OffsetRequest.LatestTime() : OffsetRequest.EarliestTime());
OffsetManager offsetManager = createOffsetManager(consumerGroup, referencePoint);
if (properties.getExtension().isResetOffsets()) {
offsetManager.resetOffsets(listenedPartitions);
offsetManager.resetOffsets(destination);
}
messageListenerContainer.setOffsetManager(offsetManager);
messageListenerContainer.setQueueSize(configurationProperties.getQueueSize());
messageListenerContainer.setMaxFetch(configurationProperties.getFetchSize());
messageListenerContainer.setQueueSize(this.configurationProperties.getQueueSize());
messageListenerContainer.setMaxFetch(this.configurationProperties.getFetchSize());
boolean autoCommitOnError = properties.getExtension().getAutoCommitOnError() != null
? properties.getExtension().getAutoCommitOnError()
: properties.getExtension().isAutoCommitOffset() && properties.getExtension().isEnableDlq();
messageListenerContainer.setAutoCommitOnError(autoCommitOnError);
messageListenerContainer.setRecoveryInterval(properties.getExtension().getRecoveryInterval());
int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
messageListenerContainer.setConcurrency(concurrency);
final ExecutorService dispatcherTaskExecutor = Executors.newFixedThreadPool(concurrency, DAEMON_THREAD_FACTORY);
messageListenerContainer.setDispatcherTaskExecutor(dispatcherTaskExecutor);
final KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter(
messageListenerContainer);
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
kafkaMessageDrivenChannelAdapter.setKeyDecoder(keyDecoder);
kafkaMessageDrivenChannelAdapter.setPayloadDecoder(valueDecoder);
kafkaMessageDrivenChannelAdapter.setOutputChannel(bridge);
kafkaMessageDrivenChannelAdapter.setKeyDecoder(new DefaultDecoder(null));
kafkaMessageDrivenChannelAdapter.setPayloadDecoder(new DefaultDecoder(null));
kafkaMessageDrivenChannelAdapter.setAutoCommitOffset(properties.getExtension().isAutoCommitOffset());
kafkaMessageDrivenChannelAdapter.afterPropertiesSet();
// we need to wrap the adapter listener into a retrying listener so that the retry
// logic is applied before the ErrorHandler is executed
final RetryTemplate retryTemplate = buildRetryTemplateIfRetryEnabled(properties);
if (retryTemplate != null) {
if (properties.getMaxAttempts() > 1) {
// we need to wrap the adapter listener into a retrying listener so that the retry
// logic is applied before the ErrorHandler is executed
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
if (properties.getExtension().isAutoCommitOffset()) {
final MessageListener originalMessageListener = (MessageListener) messageListenerContainer
.getMessageListener();
messageListenerContainer.setMessageListener(new MessageListener() {
@Override
public void onMessage(final KafkaMessage message) {
try {
retryTemplate.execute(new RetryCallback<Object, Throwable>() {
@Override
public Object doWithRetry(RetryContext context) {
originalMessageListener.onMessage(message);
@@ -531,12 +367,15 @@ public class KafkaMessageChannelBinder extends
}
else {
messageListenerContainer.setMessageListener(new AcknowledgingMessageListener() {
final AcknowledgingMessageListener originalMessageListener = (AcknowledgingMessageListener) messageListenerContainer
.getMessageListener();
final AcknowledgingMessageListener originalMessageListener =
(AcknowledgingMessageListener) messageListenerContainer
.getMessageListener();
@Override
public void onMessage(final KafkaMessage message, final Acknowledgment acknowledgment) {
retryTemplate.execute(new RetryCallback<Object, RuntimeException>() {
@Override
public Object doWithRetry(RetryContext context) {
originalMessageListener.onMessage(message, acknowledgment);
@@ -549,76 +388,188 @@ public class KafkaMessageChannelBinder extends
}
if (properties.getExtension().isEnableDlq()) {
final String dlqTopic = "error." + name + "." + group;
final String dlqTopic = "error." + name + "." + consumerGroup;
initDlqProducer();
messageListenerContainer.setErrorHandler(new ErrorHandler() {
@Override
public void handle(Exception thrownException, final KafkaMessage message) {
final byte[] key = message.getMessage().key() != null ? Utils.toArray(message.getMessage().key())
: null;
final byte[] payload = message.getMessage().payload() != null
? Utils.toArray(message.getMessage().payload()) : null;
dlqProducer.send(new ProducerRecord<>(dlqTopic, key, payload), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
StringBuffer messageLog = new StringBuffer();
messageLog.append(" a message with key='"
+ toDisplayString(ObjectUtils.nullSafeToString(key), 50) + "'");
messageLog.append(" and payload='"
+ toDisplayString(ObjectUtils.nullSafeToString(payload), 50) + "'");
messageLog.append(" received from " + message.getMetadata().getPartition());
if (exception != null) {
logger.error("Error sending to DLQ" + messageLog.toString(), exception);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Sent to DLQ " + messageLog.toString());
KafkaMessageChannelBinder.this.dlqProducer.send(new ProducerRecord<>(dlqTopic, key, payload),
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
StringBuffer messageLog = new StringBuffer();
messageLog.append(" a message with key='"
+ toDisplayString(ObjectUtils.nullSafeToString(key), 50) + "'");
messageLog.append(" and payload='"
+ toDisplayString(ObjectUtils.nullSafeToString(payload), 50) + "'");
messageLog.append(" received from " + message.getMetadata().getPartition());
if (exception != null) {
KafkaMessageChannelBinder.this.logger.error(
"Error sending to DLQ" + messageLog.toString(), exception);
}
else {
if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
KafkaMessageChannelBinder.this.logger.debug(
"Sent to DLQ " + messageLog.toString());
}
}
}
}
}
});
});
}
});
}
kafkaMessageDrivenChannelAdapter.start();
return kafkaMessageDrivenChannelAdapter;
}
EventDrivenConsumer edc = new EventDrivenConsumer(bridge, rh) {
@Override
protected MessageHandler createProducerMessageHandler(final String destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
ProducerMetadata<byte[], byte[]> producerMetadata = new ProducerMetadata<>(destination, byte[].class,
byte[].class,
BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER);
producerMetadata.setSync(producerProperties.getExtension().isSync());
producerMetadata.setCompressionType(producerProperties.getExtension().getCompressionType());
producerMetadata.setBatchBytes(producerProperties.getExtension().getBufferSize());
Properties additional = new Properties();
additional.put(ProducerConfig.ACKS_CONFIG, String.valueOf(this.configurationProperties.getRequiredAcks()));
additional.put(ProducerConfig.LINGER_MS_CONFIG,
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
ProducerFactoryBean<byte[], byte[]> producerFB = new ProducerFactoryBean<>(producerMetadata,
this.configurationProperties.getKafkaConnectionString(), additional);
final ProducerConfiguration<byte[], byte[]> producerConfiguration = new ProducerConfiguration<>(
producerMetadata, producerFB.getObject());
producerConfiguration.setProducerListener(this.producerListener);
KafkaProducerContext kafkaProducerContext = new KafkaProducerContext();
kafkaProducerContext.setProducerConfigurations(
Collections.<String, ProducerConfiguration<?, ?>>singletonMap(destination, producerConfiguration));
return new ProducerConfigurationMessageHandler(producerConfiguration, destination);
}
@Override
protected void doStop() {
// stop the offset manager and the channel adapter before unbinding
// this means that the upstream channel adapter has a chance to stop
kafkaMessageDrivenChannelAdapter.stop();
if (messageListenerContainer.getOffsetManager() instanceof DisposableBean) {
try {
((DisposableBean) messageListenerContainer.getOffsetManager()).destroy();
@Override
protected void createProducerDestinationIfNecessary(String name,
ExtendedProducerProperties<KafkaProducerProperties> properties) {
if (this.logger.isInfoEnabled()) {
this.logger.info("Using kafka topic for outbound: " + name);
}
validateTopicName(name);
Collection<Partition> partitions = ensureTopicCreated(name, properties.getPartitionCount());
// If the topic already exists, and it has a larger number of partitions than the one set in `partitionCount`,
// we will use the existing partition count of the topic instead of the user setting.
if (properties.getPartitionCount() < partitions.size()) {
if (this.logger.isInfoEnabled()) {
this.logger.info("The `partitionCount` setting of the producer for topic " + name + " is "
+ properties.getPartitionCount() + ", smaller than the actual partition count of "
+ partitions.size() + " of the topic. The larger number will be used instead.");
}
}
this.topicsInUse.put(name, partitions);
}
/**
* Creates a Kafka topic if needed, or try to increase its partition count to the
* desired number. If a topic with a larger number of partitions already exists,
* the partition count remains unchanged.
*/
private Collection<Partition> ensureTopicCreated(final String topicName, final int partitionCount) {
final ZkClient zkClient = new ZkClient(this.configurationProperties.getZkConnectionString(),
this.configurationProperties.getZkSessionTimeout(),
this.configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
try {
final Properties topicConfig = new Properties();
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient);
if (topicMetadata.errorCode() == ErrorMapping.NoError()) {
// only consider minPartitionCount for resizing if autoAddPartitions is
// true
int effectivePartitionCount = this.configurationProperties.isAutoAddPartitions()
? Math.max(this.configurationProperties.getMinPartitionCount(),
partitionCount) : partitionCount;
if (topicMetadata.partitionsMetadata().size() < effectivePartitionCount) {
if (this.configurationProperties.isAutoAddPartitions()) {
AdminUtils.addPartitions(zkClient, topicName, effectivePartitionCount, null, false,
new Properties());
}
catch (Exception e) {
logger.error("Error while closing the offset manager", e);
else {
int topicSize = topicMetadata.partitionsMetadata().size();
throw new BinderException("The number of expected partitions was: " + partitionCount + ", but "
+ topicSize + (topicSize > 1 ? " have " : " has ") + "been found instead."
+ "Consider either increasing the partition count of the topic or enabling " +
"`autoAddPartitions`");
}
}
super.doStop();
}
};
String groupedName = groupedName(name, group);
edc.setBeanName("inbound." + groupedName);
else if (topicMetadata.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) {
if (this.configurationProperties.isAutoCreateTopics()) {
Seq<Object> brokerList = ZkUtils.getSortedBrokerList(zkClient);
// always consider minPartitionCount for topic creation
int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
partitionCount);
final scala.collection.Map<Object, Seq<Object>> replicaAssignment = AdminUtils
.assignReplicasToBrokers(brokerList, effectivePartitionCount,
this.configurationProperties.getReplicationFactor(), -1, -1);
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
DefaultBinding<MessageChannel> consumerBinding = new DefaultBinding<MessageChannel>(name, group,
moduleInputChannel, edc) {
@Override
protected void afterUnbind() {
dispatcherTaskExecutor.shutdown();
@Override
public Object doWithRetry(RetryContext context) throws RuntimeException {
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topicName,
replicaAssignment, topicConfig, true);
return null;
}
});
}
else {
throw new BinderException("Topic " + topicName + " does not exist");
}
}
};
edc.start();
return consumerBinding;
else {
throw new BinderException("Error fetching Kafka topic metadata: ",
ErrorMapping.exceptionFor(topicMetadata.errorCode()));
}
try {
Collection<Partition> partitions = this.metadataRetryOperations
.execute(new RetryCallback<Collection<Partition>, Exception>() {
@Override
public Collection<Partition> doWithRetry(RetryContext context) throws Exception {
KafkaMessageChannelBinder.this.connectionFactory.refreshMetadata(
Collections.singleton(topicName));
Collection<Partition> partitions =
KafkaMessageChannelBinder.this.connectionFactory.getPartitions(topicName);
// do a sanity check on the partition set
if (partitions.size() < partitionCount) {
throw new IllegalStateException("The number of expected partitions was: "
+ partitionCount + ", but " + partitions.size()
+ (partitions.size() > 1 ? " have " : " has ") + "been found instead");
}
KafkaMessageChannelBinder.this.connectionFactory.getLeaders(partitions);
return partitions;
}
});
return partitions;
}
catch (Exception e) {
this.logger.error("Cannot initialize Binder", e);
throw new BinderException("Cannot initialize binder:", e);
}
}
finally {
zkClient.close();
}
}
private synchronized void initDlqProducer() {
try {
if (dlqProducer == null) {
if (this.dlqProducer == null) {
synchronized (this) {
if (dlqProducer == null) {
if (this.dlqProducer == null) {
// we can use the producer defaults as we do not need to tune
// performance
ProducerMetadata<byte[], byte[]> producerMetadata = new ProducerMetadata<>("dlqKafkaProducer",
@@ -628,11 +579,12 @@ public class KafkaMessageChannelBinder extends
producerMetadata.setBatchBytes(16384);
Properties additionalProps = new Properties();
additionalProps.put(ProducerConfig.ACKS_CONFIG,
String.valueOf(configurationProperties.getRequiredAcks()));
String.valueOf(this.configurationProperties.getRequiredAcks()));
additionalProps.put(ProducerConfig.LINGER_MS_CONFIG, String.valueOf(0));
ProducerFactoryBean<byte[], byte[]> producerFactoryBean = new ProducerFactoryBean<>(
producerMetadata, configurationProperties.getKafkaConnectionString(), additionalProps);
dlqProducer = producerFactoryBean.getObject();
producerMetadata, this.configurationProperties.getKafkaConnectionString(),
additionalProps);
this.dlqProducer = producerFactoryBean.getObject();
}
}
}
@@ -645,17 +597,17 @@ public class KafkaMessageChannelBinder extends
private OffsetManager createOffsetManager(String group, long referencePoint) {
try {
KafkaNativeOffsetManager kafkaOffsetManager = new KafkaNativeOffsetManager(connectionFactory,
new ZookeeperConnect(configurationProperties.getZkConnectionString()),
KafkaNativeOffsetManager kafkaOffsetManager = new KafkaNativeOffsetManager(this.connectionFactory,
new ZookeeperConnect(this.configurationProperties.getZkConnectionString()),
Collections.<Partition, Long>emptyMap());
kafkaOffsetManager.setConsumerId(group);
kafkaOffsetManager.setReferenceTimestamp(referencePoint);
kafkaOffsetManager.afterPropertiesSet();
WindowingOffsetManager windowingOffsetManager = new WindowingOffsetManager(kafkaOffsetManager);
windowingOffsetManager.setTimespan(configurationProperties.getOffsetUpdateTimeWindow());
windowingOffsetManager.setCount(configurationProperties.getOffsetUpdateCount());
windowingOffsetManager.setShutdownTimeout(configurationProperties.getOffsetUpdateShutdownTimeout());
windowingOffsetManager.setTimespan(this.configurationProperties.getOffsetUpdateTimeWindow());
windowingOffsetManager.setCount(this.configurationProperties.getOffsetUpdateCount());
windowingOffsetManager.setShutdownTimeout(this.configurationProperties.getOffsetUpdateShutdownTimeout());
windowingOffsetManager.afterPropertiesSet();
return windowingOffsetManager;
@@ -672,117 +624,6 @@ public class KafkaMessageChannelBinder extends
return original.substring(0, maxCharacters) + "...";
}
@Override
public void doManualAck(LinkedList<MessageHeaders> messageHeadersList) {
Iterator<MessageHeaders> iterator = messageHeadersList.iterator();
while (iterator.hasNext()) {
MessageHeaders headers = iterator.next();
Acknowledgment acknowledgment = (Acknowledgment) headers.get(KafkaHeaders.ACKNOWLEDGMENT);
Assert.notNull(acknowledgment,
"Acknowledgement shouldn't be null when acknowledging kafka message " + "manually.");
acknowledgment.acknowledge();
}
}
private final class ReceivingHandler extends AbstractReplyProducingMessageHandler {
private final ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties;
private ReceivingHandler(ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
this.consumerProperties = consumerProperties;
}
@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
if (HeaderMode.embeddedHeaders.equals(consumerProperties.getHeaderMode())) {
MessageValues messageValues = extractMessageValues(requestMessage);
return MessageBuilder.createMessage(messageValues.getPayload(), new KafkaBinderHeaders(messageValues));
}
else {
return requestMessage;
}
}
@Override
protected boolean shouldCopyRequestHeaders() {
// prevent the message from being copied again in superclass
return false;
}
@SuppressWarnings("serial")
private final class KafkaBinderHeaders extends MessageHeaders {
KafkaBinderHeaders(Map<String, Object> headers) {
super(headers, MessageHeaders.ID_VALUE_NONE, -1L);
}
}
}
private final class SendingHandler extends AbstractMessageHandler {
private final AtomicInteger roundRobinCount = new AtomicInteger();
private final String topicName;
private final ExtendedProducerProperties<KafkaProducerProperties> producerProperties;
private final int numberOfKafkaPartitions;
private final ProducerConfiguration<byte[], byte[]> producerConfiguration;
private final PartitionHandler partitionHandler;
private SendingHandler(String topicName, ExtendedProducerProperties<KafkaProducerProperties> properties,
int numberOfPartitions, ProducerConfiguration<byte[], byte[]> producerConfiguration) {
this.topicName = topicName;
producerProperties = properties;
this.numberOfKafkaPartitions = numberOfPartitions;
ConfigurableListableBeanFactory beanFactory = KafkaMessageChannelBinder.this.getBeanFactory();
this.setBeanFactory(beanFactory);
this.producerConfiguration = producerConfiguration;
this.partitionHandler = new PartitionHandler(beanFactory, evaluationContext, partitionSelector, properties);
}
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
int targetPartition;
if (producerProperties.isPartitioned()) {
targetPartition = this.partitionHandler.determinePartition(message);
}
else {
targetPartition = roundRobin() % numberOfKafkaPartitions;
}
if (HeaderMode.embeddedHeaders.equals(producerProperties.getHeaderMode())) {
MessageValues transformed = serializePayloadIfNecessary(message);
byte[] messageToSend = embeddedHeadersMessageConverter.embedHeaders(transformed,
KafkaMessageChannelBinder.this.headersToMap);
producerConfiguration.send(topicName, targetPartition, null, messageToSend);
}
else if (HeaderMode.raw.equals(producerProperties.getHeaderMode())) {
Object contentType = message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
if (contentType != null && !contentType.equals(MediaType.APPLICATION_OCTET_STREAM_VALUE)) {
logger.error("Raw mode supports only " + MediaType.APPLICATION_OCTET_STREAM_VALUE + " content type"
+ message.getPayload().getClass());
}
if (message.getPayload() instanceof byte[]) {
producerConfiguration.send(topicName, targetPartition, null, (byte[]) message.getPayload());
}
else {
logger.error("Raw mode supports only byte[] payloads but value sent was of type "
+ message.getPayload().getClass());
}
}
}
private int roundRobin() {
int result = roundRobinCount.incrementAndGet();
if (result == Integer.MAX_VALUE) {
roundRobinCount.set(0);
}
return result;
}
}
public enum StartOffset {
earliest(OffsetRequest.EarliestTime()), latest(OffsetRequest.LatestTime());
@@ -793,8 +634,48 @@ public class KafkaMessageChannelBinder extends
}
public long getReferencePoint() {
return referencePoint;
return this.referencePoint;
}
}
private final static class ProducerConfigurationMessageHandler implements MessageHandler, Lifecycle {
private ProducerConfiguration<byte[], byte[]> delegate;
private String targetTopic;
private boolean running;
private ProducerConfigurationMessageHandler(
ProducerConfiguration<byte[], byte[]> delegate, String targetTopic) {
Assert.notNull(delegate, "Delegate cannot be null");
Assert.hasText(targetTopic, "Target topic cannot be null");
this.delegate = delegate;
this.targetTopic = targetTopic;
}
@Override
public void start() {
this.running = true;
}
@Override
public void stop() {
this.delegate.stop();
this.running = false;
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
this.delegate.send(this.targetTopic,
message.getHeaders().get(BinderHeaders.PARTITION_HEADER, Integer.class), null,
(byte[]) message.getPayload());
}
}
}

View File

@@ -16,9 +16,6 @@
package org.springframework.cloud.stream.binder.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
@@ -27,6 +24,8 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminUtils;
import kafka.api.TopicMetadata;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -59,8 +58,8 @@ import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import kafka.admin.AdminUtils;
import kafka.api.TopicMetadata;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.fail;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
@@ -134,20 +133,21 @@ public class KafkaBinderTests extends
}
@Test
public void testDlqAndRetry() {
public void testDlqAndRetry() throws Exception {
KafkaTestBinder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
DirectChannel moduleInputChannel = new DirectChannel();
QueueChannel dlqChannel = new QueueChannel();
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
moduleInputChannel.subscribe(handler);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(10);
DirectChannel moduleOutputChannel = createBindableChannel("output",
createProducerBindingProperties(producerProperties));
QueueChannel dlqChannel = new QueueChannel();
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(3);
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
consumerProperties.getExtension().setEnableDlq(true);
DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));
moduleInputChannel.subscribe(handler);
long uniqueBindingId = System.currentTimeMillis();
Binding<MessageChannel> producerBinding = binder.bindProducer("retryTest." + uniqueBindingId + ".0",
moduleOutputChannel, producerProperties);
@@ -176,16 +176,16 @@ public class KafkaBinderTests extends
@Test
public void testDefaultAutoCommitOnErrorWithoutDlq() throws Exception {
KafkaTestBinder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
DirectChannel moduleInputChannel = new DirectChannel();
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
moduleInputChannel.subscribe(handler);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(10);
DirectChannel moduleOutputChannel = createBindableChannel("output", createProducerBindingProperties(producerProperties));
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(1);
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));
moduleInputChannel.subscribe(handler);
long uniqueBindingId = System.currentTimeMillis();
Binding<MessageChannel> producerBinding = binder.bindProducer("retryTest." + uniqueBindingId + ".0",
moduleOutputChannel, producerProperties);
@@ -224,17 +224,17 @@ public class KafkaBinderTests extends
@Test
public void testDefaultAutoCommitOnErrorWithDlq() throws Exception {
KafkaTestBinder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
DirectChannel moduleInputChannel = new DirectChannel();
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
moduleInputChannel.subscribe(handler);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(10);
DirectChannel moduleOutputChannel = createBindableChannel("output", createProducerBindingProperties(producerProperties));
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(3);
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
consumerProperties.getExtension().setEnableDlq(true);
DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));
moduleInputChannel.subscribe(handler);
long uniqueBindingId = System.currentTimeMillis();
Binding<MessageChannel> producerBinding = binder.bindProducer("retryTest." + uniqueBindingId + ".0",
moduleOutputChannel, producerProperties);
@@ -293,10 +293,12 @@ public class KafkaBinderTests extends
Arrays.fill(testPayload, (byte) 65);
KafkaTestBinder binder = getBinder();
for (ProducerMetadata.CompressionType codec : codecs) {
DirectChannel moduleOutputChannel = new DirectChannel();
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.getExtension().setCompressionType(codec);
DirectChannel moduleOutputChannel = createBindableChannel("output",
createProducerBindingProperties(producerProperties));
QueueChannel moduleInputChannel = new QueueChannel();
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.0", moduleOutputChannel,
producerProperties);
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.0", "test", moduleInputChannel,
@@ -323,10 +325,10 @@ public class KafkaBinderTests extends
binderConfiguration.setMinPartitionCount(10);
KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration);
DirectChannel moduleOutputChannel = new DirectChannel();
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(10);
DirectChannel moduleOutputChannel = createBindableChannel("output", createProducerBindingProperties(producerProperties));
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
long uniqueBindingId = System.currentTimeMillis();
Binding<MessageChannel> producerBinding = binder.bindProducer("foo" + uniqueBindingId + ".0",
@@ -566,10 +568,8 @@ public class KafkaBinderTests extends
properties.getExtension().setSync(true);
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, output, properties);
DirectFieldAccessor accessor = new DirectFieldAccessor(extractEndpoint(producerBinding));
MessageHandler handler = (MessageHandler) accessor.getPropertyValue("handler");
DirectFieldAccessor accessor1 = new DirectFieldAccessor(handler);
ProducerConfiguration producerConfiguration = (ProducerConfiguration) accessor1
.getPropertyValue("producerConfiguration");
ProducerConfiguration producerConfiguration = (ProducerConfiguration) accessor
.getPropertyValue("delegate");
assertThat(producerConfiguration.getProducerMetadata().isSync())
.withFailMessage("Kafka Sync Producer should have been enabled.");
producerBinding.unbind();
@@ -590,8 +590,8 @@ public class KafkaBinderTests extends
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
binder.setMetadataRetryOperations(metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel output = createBindableChannel("output", createConsumerBindingProperties(consumerProperties));
String testTopicName = "nonexisting" + System.currentTimeMillis();
try {
binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
@@ -622,8 +622,8 @@ public class KafkaBinderTests extends
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel output = createBindableChannel("output", createConsumerBindingProperties(consumerProperties));
Binding<MessageChannel> binding = binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
binding.unbind();
}
@@ -639,18 +639,18 @@ public class KafkaBinderTests extends
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
// this consumer must consume from partition 2
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(2);
DirectChannel output = createBindableChannel("output", createConsumerBindingProperties(consumerProperties));
try {
binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
}
catch (Exception e) {
assertThat(e).isInstanceOf(BinderException.class);
assertThat(e)
.hasMessageContaining("The number of expected partitions was: 3, but 1 has been found instead");
assertThat(e).hasMessageContaining(
"The number of expected partitions was: 3, but 1 has been found instead");
}
}
@@ -672,16 +672,15 @@ public class KafkaBinderTests extends
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
// this consumer must consume from partition 2
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceIndex(2);
DirectChannel output = createBindableChannel("output", createConsumerBindingProperties(consumerProperties));
Binding<?> binding = binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
Partition[] listenedPartitions = TestUtils.getPropertyValue(binding,
"endpoint.val$messageListenerContainer.partitions", Partition[].class);
"endpoint.messageListenerContainer.partitions", Partition[].class);
assertThat(listenedPartitions).hasSize(2);
assertThat(listenedPartitions).contains(new Partition(testTopicName, 2), new Partition(testTopicName, 5));
@@ -705,8 +704,8 @@ public class KafkaBinderTests extends
backOffPolicy.setBackOffPeriod(1000);
metatadataRetrievalRetryOperations.setBackOffPolicy(backOffPolicy);
binder.setMetadataRetryOperations(metatadataRetrievalRetryOperations);
DirectChannel output = new DirectChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
DirectChannel output = createBindableChannel("output", createConsumerBindingProperties(consumerProperties));
String testTopicName = "nonexisting" + System.currentTimeMillis();
Binding<?> binding = binder.doBindConsumer(testTopicName, "test", output, consumerProperties);
binding.unbind();

View File

@@ -25,10 +25,10 @@ import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.context.Lifecycle;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
@@ -48,15 +48,14 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
@Override
public void testPartitionedModuleJava() throws Exception {
KafkaTestBinder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.setHeaderMode(HeaderMode.raw);
properties.setPartitionKeyExtractorClass(RawKafkaPartitionTestSupport.class);
properties.setPartitionSelectorClass(RawKafkaPartitionTestSupport.class);
properties.setPartitionCount(6);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setHeaderMode(HeaderMode.raw);
producerProperties.setPartitionKeyExtractorClass(RawKafkaPartitionTestSupport.class);
producerProperties.setPartitionSelectorClass(RawKafkaPartitionTestSupport.class);
producerProperties.setPartitionCount(6);
DirectChannel output = new DirectChannel();
output.setBeanName("test.output");
Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output, properties);
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties));
Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output, producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setConcurrency(2);
@@ -100,23 +99,20 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
@Override
public void testPartitionedModuleSpEL() throws Exception {
KafkaTestBinder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload[0]"));
properties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
properties.setPartitionCount(6);
properties.setHeaderMode(HeaderMode.raw);
DirectChannel output = new DirectChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload[0]"));
producerProperties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
producerProperties.setPartitionCount(6);
producerProperties.setHeaderMode(HeaderMode.raw);
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties));
output.setBeanName("test.output");
Binding<MessageChannel> outputBinding = binder.bindProducer("part.0", output, properties);
Binding<MessageChannel> outputBinding = binder.bindProducer("part.0", output, producerProperties);
try {
AbstractEndpoint endpoint = extractEndpoint(outputBinding);
Lifecycle endpoint = extractEndpoint(outputBinding);
assertThat(getEndpointRouting(endpoint)).contains("part.0-' + headers['partition']");
}
catch (UnsupportedOperationException ignored) {
}
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setConcurrency(2);
consumerProperties.setInstanceIndex(0);
@@ -160,13 +156,15 @@ public class RawModeKafkaBinderTests extends KafkaBinderTests {
@Override
public void testSendAndReceive() throws Exception {
KafkaTestBinder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
DirectChannel moduleOutputChannel = createBindableChannel("output",
createProducerBindingProperties(producerProperties));
producerProperties.setHeaderMode(HeaderMode.raw);
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.0", moduleOutputChannel, producerProperties);
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.0", moduleOutputChannel,
producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setHeaderMode(HeaderMode.raw);
QueueChannel moduleInputChannel = new QueueChannel();
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.0", "test", moduleInputChannel,
consumerProperties);
Message<?> message = MessageBuilder.withPayload("foo".getBytes()).build();