Changes required by the refactoring of AbstractMessageChannelBinder

This commit is contained in:
Marius Bogoevici
2016-08-18 12:28:11 -04:00
parent 2447a16dbe
commit 21dbbca8f8

View File

@@ -95,7 +95,7 @@ import org.springframework.util.StringUtils;
*/
public class KafkaMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>, Collection<PartitionInfo>>
ExtendedProducerProperties<KafkaProducerProperties>, Collection<PartitionInfo>, String>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties>,
DisposableBean {
@@ -190,33 +190,33 @@ public class KafkaMessageChannelBinder extends
}
@Override
protected MessageHandler createProducerMessageHandler(final String name,
protected MessageHandler createProducerMessageHandler(final String destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
KafkaTopicUtils.validateTopicName(name);
KafkaTopicUtils.validateTopicName(destination);
Collection<PartitionInfo> partitions = ensureTopicCreated(name, producerProperties.getPartitionCount());
Collection<PartitionInfo> partitions = ensureTopicCreated(destination, producerProperties.getPartitionCount());
if (producerProperties.getPartitionCount() < partitions.size()) {
if (this.logger.isInfoEnabled()) {
this.logger.info("The `partitionCount` of the producer for topic " + name + " is "
this.logger.info("The `partitionCount` of the producer for topic " + destination + " is "
+ producerProperties.getPartitionCount() + ", smaller than the actual partition count of "
+ partitions.size() + " of the topic. The larger number will be used instead.");
}
}
this.topicsInUse.put(name, partitions);
this.topicsInUse.put(destination, partitions);
ProducerFactory<byte[], byte[]> producerFB = getProducerFactory(producerProperties);
KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFB);
if (this.producerListener != null) {
kafkaTemplate.setProducerListener(this.producerListener);
}
return new ProducerConfigurationMessageHandler(kafkaTemplate, name, producerProperties);
return new ProducerConfigurationMessageHandler(kafkaTemplate, destination, producerProperties);
}
@Override
protected void createProducerDestinationIfNecessary(String name,
protected String createProducerDestinationIfNecessary(String name,
ExtendedProducerProperties<KafkaProducerProperties> properties) {
if (this.logger.isInfoEnabled()) {
this.logger.info("Using kafka topic for outbound: " + name);
@@ -231,6 +231,7 @@ public class KafkaMessageChannelBinder extends
}
}
this.topicsInUse.put(name, partitions);
return name;
}
private ProducerFactory<byte[], byte[]> getProducerFactory(