Fix destination as pattern issues in Kafka Streams (#1054)

When destination-is-pattern property is enabled and native deserialization
is used, then Kafka Streams binder does not use the correct Serde types.
Fixing this issue by providing the proper Serdes to the StreamsBuilder.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1051
This commit is contained in:
Soby Chacko
2021-03-25 16:20:33 -04:00
committed by GitHub
parent 0ea4315af8
commit dd607627ed
2 changed files with 6 additions and 5 deletions

View File

@@ -425,17 +425,19 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
}
KStream<?, ?> stream;
final Serde<?> valueSerdeToUse = StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes()) ?
new Serdes.BytesSerde() : valueSerde;
final Consumed<?, ?> consumed = getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerdeToUse, autoOffsetReset);
if (this.kafkaStreamsExtendedBindingProperties
.getExtendedConsumerProperties(inboundName).isDestinationIsPattern()) {
final Pattern pattern = Pattern.compile(this.bindingServiceProperties.getBindingDestination(inboundName));
stream = streamsBuilder.stream(pattern);
stream = streamsBuilder.stream(pattern, consumed);
}
else {
String[] bindingTargets = StringUtils.commaDelimitedListToStringArray(
this.bindingServiceProperties.getBindingDestination(inboundName));
final Serde<?> valueSerdeToUse = StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes()) ?
new Serdes.BytesSerde() : valueSerde;
final Consumed<?, ?> consumed = getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerdeToUse, autoOffsetReset);
stream = streamsBuilder.stream(Arrays.asList(bindingTargets),
consumed);
}

View File

@@ -76,7 +76,6 @@ public class KafkaStreamsBinderDestinationIsPatternTests {
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.cloud.stream.bindings.process-out-0.destination=out",
"--spring.cloud.stream.bindings.process-in-0.destination=in.*",
"--spring.cloud.stream.bindings.process-in-0.consumer.use-native-decoding=false",
"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.destinationIsPattern=true",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString());