GH-1176: KeyValueSerdeResolver improvements

Use extended properties when initializing Consumer and Producer Serdes.

Updated copyright years and authors.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1176
This commit is contained in:
Eduard Domínguez
2021-12-10 12:26:17 +01:00
committed by Soby Chacko
parent be474f643a
commit 921b47d1e4
2 changed files with 49 additions and 22 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
@@ -69,6 +70,7 @@ import org.springframework.util.StringUtils;
*
* @author Soby Chacko
* @author Lei Chen
* @author Eduard Domínguez
*/
public class KeyValueSerdeResolver implements ApplicationContextAware {
@@ -96,14 +98,14 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
KafkaStreamsConsumerProperties extendedConsumerProperties) {
String keySerdeString = extendedConsumerProperties.getKeySerde();
return getKeySerde(keySerdeString);
return getKeySerde(keySerdeString, extendedConsumerProperties.getConfiguration());
}
public Serde<?> getInboundKeySerde(
KafkaStreamsConsumerProperties extendedConsumerProperties, ResolvableType resolvableType) {
String keySerdeString = extendedConsumerProperties.getKeySerde();
return getKeySerde(keySerdeString, resolvableType);
return getKeySerde(keySerdeString, resolvableType, extendedConsumerProperties.getConfiguration());
}
/**
@@ -120,7 +122,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
String valueSerdeString = extendedConsumerProperties.getValueSerde();
try {
if (consumerProperties != null && consumerProperties.isUseNativeDecoding()) {
valueSerde = getValueSerde(valueSerdeString);
valueSerde = getValueSerde(valueSerdeString, extendedConsumerProperties.getConfiguration());
}
else {
valueSerde = Serdes.ByteArray();
@@ -140,7 +142,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
String valueSerdeString = extendedConsumerProperties.getValueSerde();
try {
if (consumerProperties != null && consumerProperties.isUseNativeDecoding()) {
valueSerde = getValueSerde(valueSerdeString, resolvableType);
valueSerde = getValueSerde(valueSerdeString, resolvableType, extendedConsumerProperties.getConfiguration());
}
else {
valueSerde = Serdes.ByteArray();
@@ -158,11 +160,11 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
* @return configurd {@link Serde} for the outbound key.
*/
public Serde<?> getOuboundKeySerde(KafkaStreamsProducerProperties properties) {
return getKeySerde(properties.getKeySerde());
return getKeySerde(properties.getKeySerde(), properties.getConfiguration());
}
public Serde<?> getOuboundKeySerde(KafkaStreamsProducerProperties properties, ResolvableType resolvableType) {
return getKeySerde(properties.getKeySerde(), resolvableType);
return getKeySerde(properties.getKeySerde(), resolvableType, properties.getConfiguration());
}
@@ -179,7 +181,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
try {
if (producerProperties.isUseNativeEncoding()) {
valueSerde = getValueSerde(
kafkaStreamsProducerProperties.getValueSerde());
kafkaStreamsProducerProperties.getValueSerde(), kafkaStreamsProducerProperties.getConfiguration());
}
else {
valueSerde = Serdes.ByteArray();
@@ -197,7 +199,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
try {
if (producerProperties.isUseNativeEncoding()) {
valueSerde = getValueSerde(
kafkaStreamsProducerProperties.getValueSerde(), resolvableType);
kafkaStreamsProducerProperties.getValueSerde(), resolvableType, kafkaStreamsProducerProperties.getConfiguration());
}
else {
valueSerde = Serdes.ByteArray();
@@ -215,7 +217,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
* @return {@link Serde} for the state store key.
*/
public Serde<?> getStateStoreKeySerde(String keySerdeString) {
return getKeySerde(keySerdeString);
return getKeySerde(keySerdeString, (Map<String, ?>) null);
}
/**
@@ -225,14 +227,14 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
*/
public Serde<?> getStateStoreValueSerde(String valueSerdeString) {
try {
return getValueSerde(valueSerdeString);
return getValueSerde(valueSerdeString, (Map<String, ?>) null);
}
catch (ClassNotFoundException ex) {
throw new IllegalStateException("Serde class not found: ", ex);
}
}
private Serde<?> getKeySerde(String keySerdeString) {
private Serde<?> getKeySerde(String keySerdeString, Map<String, ?> extendedConfiguration) {
Serde<?> keySerde;
try {
if (StringUtils.hasText(keySerdeString)) {
@@ -241,8 +243,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
else {
keySerde = getFallbackSerde("default.key.serde");
}
keySerde.configure(this.streamConfigGlobalProperties, true);
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
}
catch (ClassNotFoundException ex) {
throw new IllegalStateException("Serde class not found: ", ex);
@@ -250,7 +251,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
return keySerde;
}
private Serde<?> getKeySerde(String keySerdeString, ResolvableType resolvableType) {
private Serde<?> getKeySerde(String keySerdeString, ResolvableType resolvableType, Map<String, ?> extendedConfiguration) {
Serde<?> keySerde = null;
try {
if (StringUtils.hasText(keySerdeString)) {
@@ -267,7 +268,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
keySerde = Serdes.ByteArray();
}
}
keySerde.configure(this.streamConfigGlobalProperties, true);
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
}
catch (ClassNotFoundException ex) {
throw new IllegalStateException("Serde class not found: ", ex);
@@ -380,7 +381,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
}
private Serde<?> getValueSerde(String valueSerdeString)
private Serde<?> getValueSerde(String valueSerdeString, Map<String, ?> extendedConfiguration)
throws ClassNotFoundException {
Serde<?> valueSerde;
if (StringUtils.hasText(valueSerdeString)) {
@@ -389,7 +390,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
else {
valueSerde = getFallbackSerde("default.value.serde");
}
valueSerde.configure(this.streamConfigGlobalProperties, false);
valueSerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
return valueSerde;
}
@@ -403,7 +404,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
}
@SuppressWarnings("unchecked")
private Serde<?> getValueSerde(String valueSerdeString, ResolvableType resolvableType)
private Serde<?> getValueSerde(String valueSerdeString, ResolvableType resolvableType, Map<String, ?> extendedConfiguration)
throws ClassNotFoundException {
Serde<?> valueSerde = null;
if (StringUtils.hasText(valueSerdeString)) {
@@ -422,7 +423,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
valueSerde = Serdes.ByteArray();
}
}
valueSerde.configure(streamConfigGlobalProperties, false);
valueSerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
return valueSerde;
}
@@ -430,4 +431,15 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = (ConfigurableApplicationContext) applicationContext;
}
private Map<String, ?> combineStreamConfigProperties(Map<String, ?> extendedConfiguration) {
if (extendedConfiguration != null && !extendedConfiguration.isEmpty()) {
Map<String, Object> streamConfiguration = new HashMap(this.streamConfigGlobalProperties);
streamConfiguration.putAll(extendedConfiguration);
return streamConfiguration;
}
else {
return this.streamConfigGlobalProperties;
}
}
}

View File

@@ -20,6 +20,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
@@ -28,9 +31,11 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
@@ -40,6 +45,7 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
/**
* @author Soby Chacko
* @author Eduard Domínguez
*/
public class KafkaStreamsBinderBootstrapTest {
@@ -111,7 +117,7 @@ public class KafkaStreamsBinderBootstrapTest {
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart",
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id"
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo",
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.configuration.spring.json.value.type.method=com.test.MyClass",
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.configuration.spring.json.value.type.method=" + this.getClass().getName() + ".determineType",
"--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id"
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar",
"--spring.cloud.stream.kafka.streams.binder.brokers="
@@ -134,10 +140,19 @@ public class KafkaStreamsBinderBootstrapTest {
final StreamsBuilderFactoryBean input3SBFB = applicationContext.getBean("&stream-builder-input3", StreamsBuilderFactoryBean.class);
final Properties streamsConfiguration3 = input3SBFB.getStreamsConfiguration();
assertThat(streamsConfiguration3.containsKey("spring.json.value.type.method")).isFalse();
applicationContext.getBean(KeyValueSerdeResolver.class);
String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2");
assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver);
applicationContext.close();
}
public static JavaType determineType(byte[] data, Headers headers) {
return TypeFactory.defaultInstance().constructParametricType(Map.class, String.class, String.class);
}
@SpringBootApplication
static class SimpleKafkaStreamsApplication {
@@ -149,7 +164,7 @@ public class KafkaStreamsBinderBootstrapTest {
}
@Bean
public Consumer<KTable<Object, String>> input2() {
public Consumer<KTable<Object, Map<String, String>>> input2() {
return s -> {
// No-op consumer
};