Configuring Kafka producer timeout

Allow setting timeout for closing the producer.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/891
Resolves #909
This commit is contained in:
Soby Chacko
2020-05-27 17:10:28 -04:00
parent 788dc28d7e
commit 473ca53723
4 changed files with 42 additions and 3 deletions

View File

@@ -409,6 +409,11 @@ To achieve exactly once consumption and production of records, the consumer and
+ +
Default: none. Default: none.
closeTimeout::
Timeout in number of seconds to wait for when closing the producer.
+
Default: `30`
==== Usage examples ==== Usage examples
In this section, we show the use of the preceding properties for specific scenarios. In this section, we show the use of the preceding properties for specific scenarios.

View File

@@ -99,6 +99,12 @@ public class KafkaProducerProperties {
*/ */
private String transactionManager; private String transactionManager;
/*
* Timeout value in seconds for the duration to wait when closing the producer.
* If not set this defaults to 30 seconds.
*/
private int closeTimeout;
/** /**
* @return buffer size * @return buffer size
* *
@@ -262,6 +268,17 @@ public class KafkaProducerProperties {
this.transactionManager = transactionManager; this.transactionManager = transactionManager;
} }
/*
* @return timeout in seconds for closing the producer
*/
public int getCloseTimeout() {
return this.closeTimeout;
}
public void setCloseTimeout(int closeTimeout) {
this.closeTimeout = closeTimeout;
}
/** /**
* Enumeration for compression types. * Enumeration for compression types.
*/ */

View File

@@ -502,17 +502,18 @@ public class KafkaMessageChannelBinder extends
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.configurationProperties.getKafkaConnectionString()); this.configurationProperties.getKafkaConnectionString());
} }
final KafkaProducerProperties kafkaProducerProperties = producerProperties.getExtension();
if (ObjectUtils.isEmpty(props.get(ProducerConfig.BATCH_SIZE_CONFIG))) { if (ObjectUtils.isEmpty(props.get(ProducerConfig.BATCH_SIZE_CONFIG))) {
props.put(ProducerConfig.BATCH_SIZE_CONFIG, props.put(ProducerConfig.BATCH_SIZE_CONFIG,
String.valueOf(producerProperties.getExtension().getBufferSize())); String.valueOf(kafkaProducerProperties.getBufferSize()));
} }
if (ObjectUtils.isEmpty(props.get(ProducerConfig.LINGER_MS_CONFIG))) { if (ObjectUtils.isEmpty(props.get(ProducerConfig.LINGER_MS_CONFIG))) {
props.put(ProducerConfig.LINGER_MS_CONFIG, props.put(ProducerConfig.LINGER_MS_CONFIG,
String.valueOf(producerProperties.getExtension().getBatchTimeout())); String.valueOf(kafkaProducerProperties.getBatchTimeout()));
} }
if (ObjectUtils.isEmpty(props.get(ProducerConfig.COMPRESSION_TYPE_CONFIG))) { if (ObjectUtils.isEmpty(props.get(ProducerConfig.COMPRESSION_TYPE_CONFIG))) {
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
producerProperties.getExtension().getCompressionType().toString()); kafkaProducerProperties.getCompressionType().toString());
} }
Map<String, String> configs = producerProperties.getExtension().getConfiguration(); Map<String, String> configs = producerProperties.getExtension().getConfiguration();
Assert.state(!configs.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), Assert.state(!configs.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
@@ -521,11 +522,17 @@ public class KafkaMessageChannelBinder extends
if (!ObjectUtils.isEmpty(configs)) { if (!ObjectUtils.isEmpty(configs)) {
props.putAll(configs); props.putAll(configs);
} }
if (!ObjectUtils.isEmpty(kafkaProducerProperties.getConfiguration())) {
props.putAll(kafkaProducerProperties.getConfiguration());
}
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = new DefaultKafkaProducerFactory<>( DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = new DefaultKafkaProducerFactory<>(
props); props);
if (transactionIdPrefix != null) { if (transactionIdPrefix != null) {
producerFactory.setTransactionIdPrefix(transactionIdPrefix); producerFactory.setTransactionIdPrefix(transactionIdPrefix);
} }
if (kafkaProducerProperties.getCloseTimeout() > 0) {
producerFactory.setPhysicalCloseTimeout(kafkaProducerProperties.getCloseTimeout());
}
return producerFactory; return producerFactory;
} }

View File

@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -62,6 +63,7 @@ public class KafkaBinderConfigurationPropertiesTest {
KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties(); KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties();
kafkaProducerProperties.setBufferSize(12345); kafkaProducerProperties.setBufferSize(12345);
kafkaProducerProperties.setBatchTimeout(100); kafkaProducerProperties.setBatchTimeout(100);
kafkaProducerProperties.setCloseTimeout(10);
kafkaProducerProperties kafkaProducerProperties
.setCompressionType(KafkaProducerProperties.CompressionType.gzip); .setCompressionType(KafkaProducerProperties.CompressionType.gzip);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>( ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
@@ -84,6 +86,14 @@ public class KafkaBinderConfigurationPropertiesTest {
assertThat(producerConfigs.get("value.serializer")) assertThat(producerConfigs.get("value.serializer"))
.isEqualTo(ByteArraySerializer.class); .isEqualTo(ByteArraySerializer.class);
assertThat(producerConfigs.get("compression.type")).isEqualTo("gzip"); assertThat(producerConfigs.get("compression.type")).isEqualTo("gzip");
Field physicalCloseTimeoutField = ReflectionUtils
.findField(DefaultKafkaProducerFactory.class, "physicalCloseTimeout", Duration.class);
ReflectionUtils.makeAccessible(physicalCloseTimeoutField);
Duration physicalCloseTimeoutConfig = (Duration) ReflectionUtils
.getField(physicalCloseTimeoutField, producerFactory);
assertThat(physicalCloseTimeoutConfig).isEqualTo(Duration.ofSeconds(10));
List<String> bootstrapServers = new ArrayList<>(); List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9082"); bootstrapServers.add("10.98.09.199:9082");
assertThat((((String) producerConfigs.get("bootstrap.servers")) assertThat((((String) producerConfigs.get("bootstrap.servers"))