diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KeyValueSerdeResolver.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KeyValueSerdeResolver.java index 3d24433d..fc925dc9 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KeyValueSerdeResolver.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KeyValueSerdeResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams; import java.lang.reflect.Method; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -69,6 +70,7 @@ import org.springframework.util.StringUtils; * * @author Soby Chacko * @author Lei Chen + * @author Eduard Domínguez */ public class KeyValueSerdeResolver implements ApplicationContextAware { @@ -96,14 +98,14 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { KafkaStreamsConsumerProperties extendedConsumerProperties) { String keySerdeString = extendedConsumerProperties.getKeySerde(); - return getKeySerde(keySerdeString); + return getKeySerde(keySerdeString, extendedConsumerProperties.getConfiguration()); } public Serde getInboundKeySerde( KafkaStreamsConsumerProperties extendedConsumerProperties, ResolvableType resolvableType) { String keySerdeString = extendedConsumerProperties.getKeySerde(); - return getKeySerde(keySerdeString, resolvableType); + return getKeySerde(keySerdeString, resolvableType, extendedConsumerProperties.getConfiguration()); } /** @@ -120,7 +122,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { String valueSerdeString = extendedConsumerProperties.getValueSerde(); try { if (consumerProperties != null && consumerProperties.isUseNativeDecoding()) { - valueSerde = getValueSerde(valueSerdeString); + valueSerde = getValueSerde(valueSerdeString, extendedConsumerProperties.getConfiguration()); } else { valueSerde = Serdes.ByteArray(); @@ -140,7 +142,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { String valueSerdeString = extendedConsumerProperties.getValueSerde(); try { if (consumerProperties != null && consumerProperties.isUseNativeDecoding()) { - valueSerde = getValueSerde(valueSerdeString, resolvableType); + valueSerde = getValueSerde(valueSerdeString, resolvableType, extendedConsumerProperties.getConfiguration()); } else { valueSerde = Serdes.ByteArray(); @@ -158,11 +160,11 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { * @return configurd {@link Serde} for the outbound key. */ public Serde getOuboundKeySerde(KafkaStreamsProducerProperties properties) { - return getKeySerde(properties.getKeySerde()); + return getKeySerde(properties.getKeySerde(), properties.getConfiguration()); } public Serde getOuboundKeySerde(KafkaStreamsProducerProperties properties, ResolvableType resolvableType) { - return getKeySerde(properties.getKeySerde(), resolvableType); + return getKeySerde(properties.getKeySerde(), resolvableType, properties.getConfiguration()); } @@ -179,7 +181,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { try { if (producerProperties.isUseNativeEncoding()) { valueSerde = getValueSerde( - kafkaStreamsProducerProperties.getValueSerde()); + kafkaStreamsProducerProperties.getValueSerde(), kafkaStreamsProducerProperties.getConfiguration()); } else { valueSerde = Serdes.ByteArray(); @@ -197,7 +199,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { try { if (producerProperties.isUseNativeEncoding()) { valueSerde = getValueSerde( - kafkaStreamsProducerProperties.getValueSerde(), resolvableType); + kafkaStreamsProducerProperties.getValueSerde(), resolvableType, kafkaStreamsProducerProperties.getConfiguration()); } else { valueSerde = Serdes.ByteArray(); @@ -215,7 +217,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { * @return {@link Serde} for the state store key. */ public Serde getStateStoreKeySerde(String keySerdeString) { - return getKeySerde(keySerdeString); + return getKeySerde(keySerdeString, (Map) null); } /** @@ -225,14 +227,14 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { */ public Serde getStateStoreValueSerde(String valueSerdeString) { try { - return getValueSerde(valueSerdeString); + return getValueSerde(valueSerdeString, (Map) null); } catch (ClassNotFoundException ex) { throw new IllegalStateException("Serde class not found: ", ex); } } - private Serde getKeySerde(String keySerdeString) { + private Serde getKeySerde(String keySerdeString, Map extendedConfiguration) { Serde keySerde; try { if (StringUtils.hasText(keySerdeString)) { @@ -241,8 +243,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { else { keySerde = getFallbackSerde("default.key.serde"); } - keySerde.configure(this.streamConfigGlobalProperties, true); - + keySerde.configure(combineStreamConfigProperties(extendedConfiguration), false); } catch (ClassNotFoundException ex) { throw new IllegalStateException("Serde class not found: ", ex); @@ -250,7 +251,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { return keySerde; } - private Serde getKeySerde(String keySerdeString, ResolvableType resolvableType) { + private Serde getKeySerde(String keySerdeString, ResolvableType resolvableType, Map extendedConfiguration) { Serde keySerde = null; try { if (StringUtils.hasText(keySerdeString)) { @@ -267,7 +268,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { keySerde = Serdes.ByteArray(); } } - keySerde.configure(this.streamConfigGlobalProperties, true); + keySerde.configure(combineStreamConfigProperties(extendedConfiguration), false); } catch (ClassNotFoundException ex) { throw new IllegalStateException("Serde class not found: ", ex); @@ -380,7 +381,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { } - private Serde getValueSerde(String valueSerdeString) + private Serde getValueSerde(String valueSerdeString, Map extendedConfiguration) throws ClassNotFoundException { Serde valueSerde; if (StringUtils.hasText(valueSerdeString)) { @@ -389,7 +390,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { else { valueSerde = getFallbackSerde("default.value.serde"); } - valueSerde.configure(this.streamConfigGlobalProperties, false); + valueSerde.configure(combineStreamConfigProperties(extendedConfiguration), false); return valueSerde; } @@ -403,7 +404,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { } @SuppressWarnings("unchecked") - private Serde getValueSerde(String valueSerdeString, ResolvableType resolvableType) + private Serde getValueSerde(String valueSerdeString, ResolvableType resolvableType, Map extendedConfiguration) throws ClassNotFoundException { Serde valueSerde = null; if (StringUtils.hasText(valueSerdeString)) { @@ -422,7 +423,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { valueSerde = Serdes.ByteArray(); } } - valueSerde.configure(streamConfigGlobalProperties, false); + valueSerde.configure(combineStreamConfigProperties(extendedConfiguration), false); return valueSerde; } @@ -430,4 +431,15 @@ public class KeyValueSerdeResolver implements ApplicationContextAware { public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { context = (ConfigurableApplicationContext) applicationContext; } + + private Map combineStreamConfigProperties(Map extendedConfiguration) { + if (extendedConfiguration != null && !extendedConfiguration.isEmpty()) { + Map streamConfiguration = new HashMap(this.streamConfigGlobalProperties); + streamConfiguration.putAll(extendedConfiguration); + return streamConfiguration; + } + else { + return this.streamConfigGlobalProperties; + } + } } diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java index ee572f87..3bf29b69 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java @@ -20,6 +20,9 @@ import java.util.Map; import java.util.Properties; import java.util.function.Consumer; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.type.TypeFactory; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; @@ -28,9 +31,11 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.StreamsBuilderFactoryBean; @@ -40,6 +45,7 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; /** * @author Soby Chacko + * @author Eduard Domínguez */ public class KafkaStreamsBinderBootstrapTest { @@ -111,7 +117,7 @@ public class KafkaStreamsBinderBootstrapTest { + "=testKafkaStreamsBinderWithStandardConfigurationCanStart", "--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id" + "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo", - "--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.configuration.spring.json.value.type.method=com.test.MyClass", + "--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.configuration.spring.json.value.type.method=" + this.getClass().getName() + ".determineType", "--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id" + "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar", "--spring.cloud.stream.kafka.streams.binder.brokers=" @@ -134,10 +140,19 @@ public class KafkaStreamsBinderBootstrapTest { final StreamsBuilderFactoryBean input3SBFB = applicationContext.getBean("&stream-builder-input3", StreamsBuilderFactoryBean.class); final Properties streamsConfiguration3 = input3SBFB.getStreamsConfiguration(); assertThat(streamsConfiguration3.containsKey("spring.json.value.type.method")).isFalse(); + applicationContext.getBean(KeyValueSerdeResolver.class); + String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) + .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2"); + + assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver); applicationContext.close(); } + public static JavaType determineType(byte[] data, Headers headers) { + return TypeFactory.defaultInstance().constructParametricType(Map.class, String.class, String.class); + } + @SpringBootApplication static class SimpleKafkaStreamsApplication { @@ -149,7 +164,7 @@ public class KafkaStreamsBinderBootstrapTest { } @Bean - public Consumer> input2() { + public Consumer>> input2() { return s -> { // No-op consumer };