Fix Serde inference issues in Kafka Streams binder

When there are multiple functions present with different outbound target types,
there is an issue of one function overriding the target type of a previous function
in the catalogue where the binder stores the target type information.
This causes problems for the binder initiated Serde inference. Addressing the issue.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/994
This commit is contained in:
Soby Chacko
2020-12-03 19:39:44 -05:00
parent fc184ba422
commit 5a7cc9f257
5 changed files with 35 additions and 12 deletions

View File

@@ -121,13 +121,13 @@ class KStreamBinder extends
this.kafkaTopicProvisioner.provisionProducerDestination(name, extendedProducerProperties);
Serde<?> keySerde = this.keyValueSerdeResolver
.getOuboundKeySerde(properties.getExtension(), kafkaStreamsBindingInformationCatalogue.getOutboundKStreamResolvable());
.getOuboundKeySerde(properties.getExtension(), kafkaStreamsBindingInformationCatalogue.getOutboundKStreamResolvable(outboundBindTarget));
LOG.info("Key Serde used for (outbound) " + name + ": " + keySerde.getClass().getName());
Serde<?> valueSerde;
if (properties.isUseNativeEncoding()) {
valueSerde = this.keyValueSerdeResolver.getOutboundValueSerde(properties,
properties.getExtension(), kafkaStreamsBindingInformationCatalogue.getOutboundKStreamResolvable());
properties.getExtension(), kafkaStreamsBindingInformationCatalogue.getOutboundKStreamResolvable(outboundBindTarget));
}
else {
valueSerde = Serdes.ByteArray();

View File

@@ -49,7 +49,7 @@ class KafkaStreamsBindingInformationCatalogue {
private final Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = new HashSet<>();
private ResolvableType outboundKStreamResolvable;
private final Map<Object, ResolvableType> outboundKStreamResolvables = new HashMap<>();
private final Map<KStream<?, ?>, Serde<?>> keySerdeInfo = new HashMap<>();
@@ -135,12 +135,12 @@ class KafkaStreamsBindingInformationCatalogue {
return this.streamsBuilderFactoryBeans;
}
void setOutboundKStreamResolvable(ResolvableType outboundResolvable) {
this.outboundKStreamResolvable = outboundResolvable;
void addOutboundKStreamResolvable(Object key, ResolvableType outboundResolvable) {
this.outboundKStreamResolvables.put(key, outboundResolvable);
}
ResolvableType getOutboundKStreamResolvable() {
return outboundKStreamResolvable;
ResolvableType getOutboundKStreamResolvable(Object key) {
return outboundKStreamResolvables.get(key);
}
/**

View File

@@ -218,8 +218,6 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
i++;
}
if (result != null) {
kafkaStreamsBindingInformationCatalogue.setOutboundKStreamResolvable(
outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(1));
final Set<String> outputs = new TreeSet<>(kafkaStreamsBindableProxyFactory.getOutputs());
final Iterator<String> outboundDefinitionIterator = outputs.iterator();
@@ -248,6 +246,8 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
boundElement.wrap((KStream) outboundKStreams[ij]);
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(
targetBean, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(1));
}
}
else {
@@ -257,6 +257,9 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
KStreamBoundElementFactory.KStreamWrapper
boundElement = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
boundElement.wrap((KStream) result);
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(
targetBean, outboundResolvableType != null ? outboundResolvableType : resolvableType.getGeneric(1));
}
}
}

View File

@@ -183,7 +183,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
"Result does not match with the number of declared outbounds");
}
}
kafkaStreamsBindingInformationCatalogue.setOutboundKStreamResolvable(ResolvableType.forMethodReturnType(method));
if (methodAnnotatedOutboundNames != null && methodAnnotatedOutboundNames.length > 0) {
if (result.getClass().isArray()) {
Object[] outboundKStreams = (Object[]) result;
@@ -191,12 +191,14 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
for (Object outboundKStream : outboundKStreams) {
Object targetBean = this.applicationContext
.getBean(methodAnnotatedOutboundNames[i++]);
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean, ResolvableType.forMethodReturnType(method));
adaptStreamListenerResult(outboundKStream, targetBean);
}
}
else {
Object targetBean = this.applicationContext
.getBean(methodAnnotatedOutboundNames[0]);
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean, ResolvableType.forMethodReturnType(method));
adaptStreamListenerResult(result, targetBean);
}
}

View File

@@ -14,8 +14,9 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.function;
package org.springframework.cloud.stream.binder.kafka.streams;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -26,6 +27,7 @@ import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.AfterClass;
@@ -38,6 +40,7 @@ import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.core.ResolvableType;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -46,6 +49,7 @@ import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import static org.assertj.core.api.Assertions.assertThat;
@@ -77,6 +81,7 @@ public class MultipleFunctionsInSameAppTests {
}
@Test
@SuppressWarnings("unchecked")
public void testMultiFunctionsInSameApp() throws InterruptedException {
SpringApplication app = new SpringApplication(MultipleFunctionsInSameApp.class);
app.setWebApplicationType(WebApplicationType.NONE);
@@ -84,7 +89,7 @@ public class MultipleFunctionsInSameAppTests {
try (ConfigurableApplicationContext context = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.definition=process;analyze;anotherProcess",
"--spring.cloud.stream.function.definition=process;analyze;anotherProcess;yetAnotherProcess",
"--spring.cloud.stream.bindings.process-in-0.destination=purchases",
"--spring.cloud.stream.bindings.process-out-0.destination=coffee",
"--spring.cloud.stream.bindings.process-out-1.destination=electronics",
@@ -123,6 +128,14 @@ public class MultipleFunctionsInSameAppTests {
concurrency = (Integer) analyzeStreamsConfiguration.get(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
assertThat(concurrency).isEqualTo(1);
assertThat(anotherProcessStreamsConfiguration.get(StreamsConfig.NUM_STREAM_THREADS_CONFIG)).isEqualTo("3");
final KafkaStreamsBindingInformationCatalogue catalogue = context.getBean(KafkaStreamsBindingInformationCatalogue.class);
Field field = ReflectionUtils.findField(KafkaStreamsBindingInformationCatalogue.class, "outboundKStreamResolvables", Map.class);
ReflectionUtils.makeAccessible(field);
final Map<Object, ResolvableType> outboundKStreamResolvables = (Map<Object, ResolvableType>) ReflectionUtils.getField(field, catalogue);
// Since we have 2 functions with return types -- one is an array return type with 2 bindings -- assert that
// the catalogue contains outbound type information for all the 3 different bindings.
assertThat(outboundKStreamResolvables.size()).isEqualTo(3);
}
}
@@ -212,6 +225,11 @@ public class MultipleFunctionsInSameAppTests {
(s, p) -> p.equalsIgnoreCase("electronics"));
}
@Bean
public Function<KStream<String, String>, KStream<String, Long>> yetAnotherProcess() {
return input -> input.map((k, v) -> new KeyValue<>("foo", 1L));
}
@Bean
public BiConsumer<KStream<String, String>, KStream<String, String>> analyze() {
return (coffee, electronics) -> {