Fix destination as pattern issues in Kafka Streams (#1054)

When destination-is-pattern property is enabled and native deserialization
is used, then Kafka Streams binder does not use the correct Serde types.
Fixing this issue by providing the proper Serdes to the StreamsBuilder.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1051
This commit is contained in:
Soby Chacko
2021-03-25 16:20:33 -04:00
committed by Gary Russell
parent 3c151a6186
commit f84876b92a
3 changed files with 7 additions and 3 deletions

1
.gitignore vendored
View File

@@ -12,6 +12,7 @@ _site/
.project
.settings
.springBeans
.sts4-cache
.DS_Store
*.sw*
*.iml

View File

@@ -392,15 +392,19 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
}
KStream<?, ?> stream;
final Serde<?> valueSerdeToUse = StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes()) ?
new Serdes.BytesSerde() : valueSerde;
final Consumed<?, ?> consumed = getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerdeToUse, autoOffsetReset);
if (this.kafkaStreamsExtendedBindingProperties
.getExtendedConsumerProperties(inboundName).isDestinationIsPattern()) {
final Pattern pattern = Pattern.compile(this.bindingServiceProperties.getBindingDestination(inboundName));
stream = streamsBuilder.stream(pattern);
stream = streamsBuilder.stream(pattern, consumed);
}
else {
String[] bindingTargets = StringUtils.commaDelimitedListToStringArray(
this.bindingServiceProperties.getBindingDestination(inboundName));
final Consumed<?, ?> consumed = getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerde, autoOffsetReset);
stream = streamsBuilder.stream(Arrays.asList(bindingTargets),
consumed);
}

View File

@@ -76,7 +76,6 @@ public class KafkaStreamsBinderDestinationIsPatternTests {
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.cloud.stream.bindings.process-out-0.destination=out",
"--spring.cloud.stream.bindings.process-in-0.destination=in.*",
"--spring.cloud.stream.bindings.process-in-0.consumer.use-native-decoding=false",
"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.destinationIsPattern=true",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString());