GH-1096: Named components in Kafka Streams

Support KIP-307 in Kafka Streams binder where the input (source)
and output (sink) bindings are customized with user-provided names.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1096

Resolves #1098
This commit is contained in:
Soby Chacko
2021-07-01 19:35:20 -04:00
committed by Oleg Zhurakousky
parent 2423731820
commit 73b30a0989
6 changed files with 86 additions and 1 deletions

View File

@@ -1698,6 +1698,11 @@ streamPartitionerBeanName:
+
Default: See the discussion above on outbound partition support.
producedAs::
Custom name for the sink component to which the processor is producing to.
+
Deafult: `none` (generated by Kafka Streams)
==== Kafka Streams Consumer Properties
The following properties are available for Kafka Streams consumers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.`
@@ -1757,6 +1762,22 @@ timestampExtractorBeanName::
+
Default: See the discussion above on timestamp extractors.
eventTypes::
Comma separated list of supported event types for this binding.
+
Default: `none`
eventTypeHeaderKey::
Event type header key on each incoming records through this binding.
+
Default: `event_type`
consumedAs::
Custom name for the source component from which the processor is consuming from.
+
Deafult: `none` (generated by Kafka Streams)
==== Special note on concurrency
In Kafka Streams, you can control of the number of threads a processor can create using the `num.stream.threads` property.

View File

@@ -512,6 +512,9 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
if (timestampExtractor != null) {
consumed.withTimestampExtractor(timestampExtractor);
}
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getConsumedAs())) {
consumed.withName(kafkaStreamsConsumerProperties.getConsumedAs());
}
return consumed;
}
}

View File

@@ -146,6 +146,9 @@ class KStreamBinder extends
KStream<Object, Object> outboundBindTarget, Serde<Object> keySerde,
Serde<Object> valueSerde, KafkaStreamsProducerProperties properties) {
final Produced<Object, Object> produced = Produced.with(keySerde, valueSerde);
if (StringUtils.hasText(properties.getProducedAs())) {
produced.withName(properties.getProducedAs());
}
StreamPartitioner streamPartitioner = null;
if (!StringUtils.isEmpty(properties.getStreamPartitionerBeanName())) {
streamPartitioner = getApplicationContext().getBean(properties.getStreamPartitionerBeanName(),

View File

@@ -54,6 +54,23 @@ public class KafkaStreamsConsumerProperties extends KafkaConsumerProperties {
*/
private String timestampExtractorBeanName;
/**
* Comma separated list of supported event types for this binding.
*/
private String eventTypes;
/**
* Record level header key for event type.
* If the default value is overridden, then that is expected on each record header if eventType based
* routing is enabled on this binding (by setting eventTypes).
*/
private String eventTypeHeaderKey = "event_type";
/**
* Custom name for the source component from which the processor is consuming from.
*/
private String consumedAs;
public String getApplicationId() {
return this.applicationId;
}
@@ -101,4 +118,28 @@ public class KafkaStreamsConsumerProperties extends KafkaConsumerProperties {
public void setDeserializationExceptionHandler(DeserializationExceptionHandler deserializationExceptionHandler) {
this.deserializationExceptionHandler = deserializationExceptionHandler;
}
public String getEventTypes() {
return eventTypes;
}
public void setEventTypes(String eventTypes) {
this.eventTypes = eventTypes;
}
public String getEventTypeHeaderKey() {
return this.eventTypeHeaderKey;
}
public void setEventTypeHeaderKey(String eventTypeHeaderKey) {
this.eventTypeHeaderKey = eventTypeHeaderKey;
}
public String getConsumedAs() {
return consumedAs;
}
public void setConsumedAs(String consumedAs) {
this.consumedAs = consumedAs;
}
}

View File

@@ -41,6 +41,11 @@ public class KafkaStreamsProducerProperties extends KafkaProducerProperties {
*/
private String streamPartitionerBeanName;
/**
* Custom name for the sink component to which the processor is producing to.
*/
private String producedAs;
public String getKeySerde() {
return this.keySerde;
}
@@ -64,4 +69,12 @@ public class KafkaStreamsProducerProperties extends KafkaProducerProperties {
public void setStreamPartitionerBeanName(String streamPartitionerBeanName) {
this.streamPartitionerBeanName = streamPartitionerBeanName;
}
public String getProducedAs() {
return producedAs;
}
public void setProducedAs(String producedAs) {
this.producedAs = producedAs;
}
}

View File

@@ -99,7 +99,7 @@ public class KafkaStreamsBinderWordCountFunctionTests {
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.process-in-0.destination=words",
"--spring.cloud.stream.bindings.process-out-0.destination=counts",
"--spring.cloud.stream.kafka.streams.default.consumer.application-id=testKstreamWordCountFunction",
"--spring.cloud.stream.kafka.streams.binder.application-id=testKstreamWordCountFunction",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.consumerProperties.request.timeout.ms=29000", //for testing ...binder.consumerProperties
"--spring.cloud.stream.kafka.streams.binder.producerProperties.max.block.ms=90000", //for testing ...binder.producerProperties
@@ -107,6 +107,8 @@ public class KafkaStreamsBinderWordCountFunctionTests {
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" +
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.consumedAs=custom-consumer",
"--spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.producedAs=custom-producer",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
receiveAndValidate("words", "counts");
final MeterRegistry meterRegistry = context.getBean(MeterRegistry.class);
@@ -121,6 +123,8 @@ public class KafkaStreamsBinderWordCountFunctionTests {
final String topology2 = kafkaStreamsTopologyEndpoint.kafkaStreamsTopology("testKstreamWordCountFunction");
assertThat(topology1).isNotEmpty();
assertThat(topology1).isEqualTo(topology2);
assertThat(topology1.contains("Source: custom-consumer")).isTrue();
assertThat(topology1.contains("Sink: custom-producer")).isTrue();
//verify that ...binder.consumerProperties and ...binder.producerProperties work.
Map<String, Object> streamConfigGlobalProperties = (Map<String, Object>) context.getBean("streamConfigGlobalProperties");