Add support for specifying generic properties for Kafka producers/consumers

This commit is contained in:
Marius Bogoevici
2016-07-31 18:36:48 -04:00
parent 8f9ac1cd6a
commit dd505e0366
3 changed files with 44 additions and 9 deletions

View File

@@ -16,8 +16,13 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashMap;
import java.util.Map;
/**
* @author Marius Bogoevici
*
* <p>Thanks to Laszlo Szabo for providing the initial patch for generic property support.</p>
*/
public class KafkaConsumerProperties {
@@ -35,8 +40,10 @@ public class KafkaConsumerProperties {
private int recoveryInterval = 5000;
private Map<String, String> configuration = new HashMap<>();
public boolean isAutoCommitOffset() {
return autoCommitOffset;
return this.autoCommitOffset;
}
public void setAutoCommitOffset(boolean autoCommitOffset) {
@@ -44,7 +51,7 @@ public class KafkaConsumerProperties {
}
public boolean isResetOffsets() {
return resetOffsets;
return this.resetOffsets;
}
public void setResetOffsets(boolean resetOffsets) {
@@ -52,7 +59,7 @@ public class KafkaConsumerProperties {
}
public StartOffset getStartOffset() {
return startOffset;
return this.startOffset;
}
public void setStartOffset(StartOffset startOffset) {
@@ -60,7 +67,7 @@ public class KafkaConsumerProperties {
}
public boolean isEnableDlq() {
return enableDlq;
return this.enableDlq;
}
public void setEnableDlq(boolean enableDlq) {
@@ -68,7 +75,7 @@ public class KafkaConsumerProperties {
}
public Boolean getAutoCommitOnError() {
return autoCommitOnError;
return this.autoCommitOnError;
}
public void setAutoCommitOnError(Boolean autoCommitOnError) {
@@ -76,7 +83,7 @@ public class KafkaConsumerProperties {
}
public int getRecoveryInterval() {
return recoveryInterval;
return this.recoveryInterval;
}
public void setRecoveryInterval(int recoveryInterval) {
@@ -84,7 +91,7 @@ public class KafkaConsumerProperties {
}
public boolean isAutoRebalanceEnabled() {
return autoRebalanceEnabled;
return this.autoRebalanceEnabled;
}
public void setAutoRebalanceEnabled(boolean autoRebalanceEnabled) {
@@ -101,7 +108,15 @@ public class KafkaConsumerProperties {
}
public long getReferencePoint() {
return referencePoint;
return this.referencePoint;
}
}
public Map<String, String> getConfiguration() {
return this.configuration;
}
public void setConfiguration(Map<String, String> configuration) {
this.configuration = configuration;
}
}

View File

@@ -249,6 +249,9 @@ public class KafkaMessageChannelBinder extends
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
producerProperties.getExtension().getCompressionType().toString());
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
}
return new DefaultKafkaProducerFactory<>(props);
}
@@ -293,10 +296,14 @@ public class KafkaMessageChannelBinder extends
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
if (!ObjectUtils.isEmpty(properties.getExtension().getConfiguration())) {
props.putAll(properties.getExtension().getConfiguration());
}
ConsumerFactory<byte[], byte[]> consumerFactory = new DefaultKafkaConsumerFactory<>(props, keyDecoder,
valueDecoder);
Collection<PartitionInfo> listenedPartitions = (Collection<PartitionInfo>) destination;
Collection<PartitionInfo> listenedPartitions = destination;
Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
listenedPartitions);

View File

@@ -16,6 +16,9 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashMap;
import java.util.Map;
import javax.validation.constraints.NotNull;
/**
@@ -31,6 +34,8 @@ public class KafkaProducerProperties {
private int batchTimeout;
private Map<String, String> configuration = new HashMap<>();
public int getBufferSize() {
return this.bufferSize;
}
@@ -64,6 +69,14 @@ public class KafkaProducerProperties {
this.batchTimeout = batchTimeout;
}
public Map<String, String> getConfiguration() {
return this.configuration;
}
public void setConfiguration(Map<String, String> configuration) {
this.configuration = configuration;
}
public enum CompressionType {
none,
gzip,