From 473ca53723b0a7491539ce4fd0754fa102f349ce Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 27 May 2020 17:10:28 -0400 Subject: [PATCH] Configuring Kafka producer timeout Allow setting timeout for closing the producer. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/891 Resolves #909 --- docs/src/main/asciidoc/overview.adoc | 5 +++++ .../properties/KafkaProducerProperties.java | 17 +++++++++++++++++ .../binder/kafka/KafkaMessageChannelBinder.java | 13 ++++++++++--- .../KafkaBinderConfigurationPropertiesTest.java | 10 ++++++++++ 4 files changed, 42 insertions(+), 3 deletions(-) diff --git a/docs/src/main/asciidoc/overview.adoc b/docs/src/main/asciidoc/overview.adoc index 0237612c..16db230a 100644 --- a/docs/src/main/asciidoc/overview.adoc +++ b/docs/src/main/asciidoc/overview.adoc @@ -409,6 +409,11 @@ To achieve exactly once consumption and production of records, the consumer and + Default: none. +closeTimeout:: +Timeout in number of seconds to wait for when closing the producer. ++ +Default: `30` + ==== Usage examples In this section, we show the use of the preceding properties for specific scenarios. diff --git a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaProducerProperties.java b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaProducerProperties.java index 9a10246b..3689e067 100644 --- a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaProducerProperties.java +++ b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaProducerProperties.java @@ -99,6 +99,12 @@ public class KafkaProducerProperties { */ private String transactionManager; + /* + * Timeout value in seconds for the duration to wait when closing the producer. + * If not set this defaults to 30 seconds. + */ + private int closeTimeout; + /** * @return buffer size * @@ -262,6 +268,17 @@ public class KafkaProducerProperties { this.transactionManager = transactionManager; } + /* + * @return timeout in seconds for closing the producer + */ + public int getCloseTimeout() { + return this.closeTimeout; + } + + public void setCloseTimeout(int closeTimeout) { + this.closeTimeout = closeTimeout; + } + /** * Enumeration for compression types. */ diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index f55dea37..50eb5b19 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -502,17 +502,18 @@ public class KafkaMessageChannelBinder extends props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString()); } + final KafkaProducerProperties kafkaProducerProperties = producerProperties.getExtension(); if (ObjectUtils.isEmpty(props.get(ProducerConfig.BATCH_SIZE_CONFIG))) { props.put(ProducerConfig.BATCH_SIZE_CONFIG, - String.valueOf(producerProperties.getExtension().getBufferSize())); + String.valueOf(kafkaProducerProperties.getBufferSize())); } if (ObjectUtils.isEmpty(props.get(ProducerConfig.LINGER_MS_CONFIG))) { props.put(ProducerConfig.LINGER_MS_CONFIG, - String.valueOf(producerProperties.getExtension().getBatchTimeout())); + String.valueOf(kafkaProducerProperties.getBatchTimeout())); } if (ObjectUtils.isEmpty(props.get(ProducerConfig.COMPRESSION_TYPE_CONFIG))) { props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, - producerProperties.getExtension().getCompressionType().toString()); + kafkaProducerProperties.getCompressionType().toString()); } Map configs = producerProperties.getExtension().getConfiguration(); Assert.state(!configs.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), @@ -521,11 +522,17 @@ public class KafkaMessageChannelBinder extends if (!ObjectUtils.isEmpty(configs)) { props.putAll(configs); } + if (!ObjectUtils.isEmpty(kafkaProducerProperties.getConfiguration())) { + props.putAll(kafkaProducerProperties.getConfiguration()); + } DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>( props); if (transactionIdPrefix != null) { producerFactory.setTransactionIdPrefix(transactionIdPrefix); } + if (kafkaProducerProperties.getCloseTimeout() > 0) { + producerFactory.setPhysicalCloseTimeout(kafkaProducerProperties.getCloseTimeout()); + } return producerFactory; } diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationPropertiesTest.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationPropertiesTest.java index ca8d80d3..a6e65027 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationPropertiesTest.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationPropertiesTest.java @@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -62,6 +63,7 @@ public class KafkaBinderConfigurationPropertiesTest { KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties(); kafkaProducerProperties.setBufferSize(12345); kafkaProducerProperties.setBatchTimeout(100); + kafkaProducerProperties.setCloseTimeout(10); kafkaProducerProperties .setCompressionType(KafkaProducerProperties.CompressionType.gzip); ExtendedProducerProperties producerProperties = new ExtendedProducerProperties<>( @@ -84,6 +86,14 @@ public class KafkaBinderConfigurationPropertiesTest { assertThat(producerConfigs.get("value.serializer")) .isEqualTo(ByteArraySerializer.class); assertThat(producerConfigs.get("compression.type")).isEqualTo("gzip"); + + Field physicalCloseTimeoutField = ReflectionUtils + .findField(DefaultKafkaProducerFactory.class, "physicalCloseTimeout", Duration.class); + ReflectionUtils.makeAccessible(physicalCloseTimeoutField); + Duration physicalCloseTimeoutConfig = (Duration) ReflectionUtils + .getField(physicalCloseTimeoutField, producerFactory); + assertThat(physicalCloseTimeoutConfig).isEqualTo(Duration.ofSeconds(10)); + List bootstrapServers = new ArrayList<>(); bootstrapServers.add("10.98.09.199:9082"); assertThat((((String) producerConfigs.get("bootstrap.servers"))