Kafka Streams binder producer/consumerProperties

Fix the issue of Kafka Streams binder level producerProperties
and consumerProperties (...binder.producerProperties and
...binder.consumerProperties), are not taken into consideration.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/997
This commit is contained in:
Soby Chacko
2020-12-04 17:49:57 -05:00
parent 5a59669081
commit 9290a321b5
3 changed files with 26 additions and 0 deletions

View File

@@ -1666,6 +1666,12 @@ stateStoreRetry.backoffPeriod::
+
Default: 1000 ms
consumerProperties::
Arbitrary consumer properties at the binder level.
producerProperties::
Arbitrary producer properties at the binder level.
==== Kafka Streams Producer Properties
The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`

View File

@@ -270,6 +270,18 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
if (!ObjectUtils.isEmpty(configProperties.getConfiguration())) {
properties.putAll(configProperties.getConfiguration());
}
Map<String, Object> mergedConsumerConfig = configProperties.mergedConsumerConfiguration();
if (!ObjectUtils.isEmpty(mergedConsumerConfig)) {
properties.putAll(mergedConsumerConfig);
}
Map<String, Object> mergedProducerConfig = configProperties.mergedProducerConfiguration();
if (!ObjectUtils.isEmpty(mergedProducerConfig)) {
properties.putAll(mergedProducerConfig);
}
if (!properties.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG)) {
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,
(int) configProperties.getReplicationFactor());
}
return properties.entrySet().stream().collect(
Collectors.toMap((e) -> String.valueOf(e.getKey()), Map.Entry::getValue));
}

View File

@@ -89,6 +89,7 @@ public class KafkaStreamsBinderWordCountFunctionTests {
}
@Test
@SuppressWarnings("unchecked")
public void testKstreamWordCountFunction() throws Exception {
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
@@ -100,6 +101,8 @@ public class KafkaStreamsBinderWordCountFunctionTests {
"--spring.cloud.stream.bindings.process-out-0.destination=counts",
"--spring.cloud.stream.kafka.streams.default.consumer.application-id=testKstreamWordCountFunction",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.consumerProperties.request.timeout.ms=29000", //for testing ...binder.consumerProperties
"--spring.cloud.stream.kafka.streams.binder.producerProperties.max.block.ms=90000", //for testing ...binder.producerProperties
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" +
@@ -118,6 +121,11 @@ public class KafkaStreamsBinderWordCountFunctionTests {
final String topology2 = kafkaStreamsTopologyEndpoint.kafkaStreamsTopology("testKstreamWordCountFunction");
assertThat(topology1).isNotEmpty();
assertThat(topology1).isEqualTo(topology2);
//verify that ...binder.consumerProperties and ...binder.producerProperties work.
Map<String, Object> streamConfigGlobalProperties = (Map<String, Object>) context.getBean("streamConfigGlobalProperties");
assertThat(streamConfigGlobalProperties.get("request.timeout.ms")).isEqualTo("29000");
assertThat(streamConfigGlobalProperties.get("max.block.ms")).isEqualTo("90000");
}
}