From d1c62bbc262214c37a5ce03d2032e08a76101dbc Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 4 Dec 2020 17:49:57 -0500 Subject: [PATCH] Kafka Streams binder producer/consumerProperties Fix the issue of Kafka Streams binder level producerProperties and consumerProperties (...binder.producerProperties and ...binder.consumerProperties), are not taken into consideration. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/997 --- docs/src/main/asciidoc/kafka-streams.adoc | 6 ++++++ .../KafkaStreamsBinderSupportAutoConfiguration.java | 8 ++++++++ .../KafkaStreamsBinderWordCountFunctionTests.java | 8 ++++++++ 3 files changed, 22 insertions(+) diff --git a/docs/src/main/asciidoc/kafka-streams.adoc b/docs/src/main/asciidoc/kafka-streams.adoc index dd24dc1e..6a4312d8 100644 --- a/docs/src/main/asciidoc/kafka-streams.adoc +++ b/docs/src/main/asciidoc/kafka-streams.adoc @@ -1666,6 +1666,12 @@ stateStoreRetry.backoffPeriod:: + Default: 1000 ms +consumerProperties:: + Arbitrary consumer properties at the binder level. + +producerProperties:: + Arbitrary producer properties at the binder level. + ==== Kafka Streams Producer Properties The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings..producer.` diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java index 7c2787bc..9551bd96 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java @@ -270,6 +270,14 @@ public class KafkaStreamsBinderSupportAutoConfiguration { if (!ObjectUtils.isEmpty(configProperties.getConfiguration())) { properties.putAll(configProperties.getConfiguration()); } + Map mergedConsumerConfig = configProperties.mergedConsumerConfiguration(); + if (!ObjectUtils.isEmpty(mergedConsumerConfig)) { + properties.putAll(mergedConsumerConfig); + } + Map mergedProducerConfig = configProperties.mergedProducerConfiguration(); + if (!ObjectUtils.isEmpty(mergedProducerConfig)) { + properties.putAll(mergedProducerConfig); + } if (!properties.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG)) { properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, (int) configProperties.getReplicationFactor()); diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java index e086bd28..b5204b96 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java @@ -89,6 +89,7 @@ public class KafkaStreamsBinderWordCountFunctionTests { } @Test + @SuppressWarnings("unchecked") public void testKstreamWordCountFunction() throws Exception { SpringApplication app = new SpringApplication(WordCountProcessorApplication.class); app.setWebApplicationType(WebApplicationType.NONE); @@ -100,6 +101,8 @@ public class KafkaStreamsBinderWordCountFunctionTests { "--spring.cloud.stream.bindings.process-out-0.destination=counts", "--spring.cloud.stream.kafka.streams.default.consumer.application-id=testKstreamWordCountFunction", "--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000", + "--spring.cloud.stream.kafka.streams.binder.consumerProperties.request.timeout.ms=29000", //for testing ...binder.consumerProperties + "--spring.cloud.stream.kafka.streams.binder.producerProperties.max.block.ms=90000", //for testing ...binder.producerProperties "--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" + "=org.apache.kafka.common.serialization.Serdes$StringSerde", "--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" + @@ -118,6 +121,11 @@ public class KafkaStreamsBinderWordCountFunctionTests { final String topology2 = kafkaStreamsTopologyEndpoint.kafkaStreamsTopology("testKstreamWordCountFunction"); assertThat(topology1).isNotEmpty(); assertThat(topology1).isEqualTo(topology2); + + //verify that ...binder.consumerProperties and ...binder.producerProperties work. + Map streamConfigGlobalProperties = (Map) context.getBean("streamConfigGlobalProperties"); + assertThat(streamConfigGlobalProperties.get("request.timeout.ms")).isEqualTo("29000"); + assertThat(streamConfigGlobalProperties.get("max.block.ms")).isEqualTo("90000"); } }