From dd607627ed5b3a2f1403a72df559cb6b28e70951 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 25 Mar 2021 16:20:33 -0400 Subject: [PATCH] Fix destination as pattern issues in Kafka Streams (#1054) When destination-is-pattern property is enabled and native deserialization is used, then Kafka Streams binder does not use the correct Serde types. Fixing this issue by providing the proper Serdes to the StreamsBuilder. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1051 --- .../streams/AbstractKafkaStreamsBinderProcessor.java | 10 ++++++---- .../KafkaStreamsBinderDestinationIsPatternTests.java | 1 - 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java index 8d41f50e..e581fce3 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java @@ -425,17 +425,19 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application } KStream stream; + final Serde valueSerdeToUse = StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes()) ? + new Serdes.BytesSerde() : valueSerde; + final Consumed consumed = getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerdeToUse, autoOffsetReset); + if (this.kafkaStreamsExtendedBindingProperties .getExtendedConsumerProperties(inboundName).isDestinationIsPattern()) { final Pattern pattern = Pattern.compile(this.bindingServiceProperties.getBindingDestination(inboundName)); - stream = streamsBuilder.stream(pattern); + + stream = streamsBuilder.stream(pattern, consumed); } else { String[] bindingTargets = StringUtils.commaDelimitedListToStringArray( this.bindingServiceProperties.getBindingDestination(inboundName)); - final Serde valueSerdeToUse = StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes()) ? - new Serdes.BytesSerde() : valueSerde; - final Consumed consumed = getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerdeToUse, autoOffsetReset); stream = streamsBuilder.stream(Arrays.asList(bindingTargets), consumed); } diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderDestinationIsPatternTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderDestinationIsPatternTests.java index 424b87cd..47004a29 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderDestinationIsPatternTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsBinderDestinationIsPatternTests.java @@ -76,7 +76,6 @@ public class KafkaStreamsBinderDestinationIsPatternTests { ConfigurableApplicationContext context = app.run("--server.port=0", "--spring.cloud.stream.bindings.process-out-0.destination=out", "--spring.cloud.stream.bindings.process-in-0.destination=in.*", - "--spring.cloud.stream.bindings.process-in-0.consumer.use-native-decoding=false", "--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.destinationIsPattern=true", "--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString());