GH-1176: KeyValueSerdeResolver improvements
Use extended properties when initializing Consumer and Producer Serdes. Updated copyright years and authors. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1176
This commit is contained in:
committed by
Soby Chacko
parent
5cd8e06ec6
commit
63b306d34c
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2019 the original author or authors.
|
||||
* Copyright 2018-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
@@ -69,6 +70,7 @@ import org.springframework.util.StringUtils;
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @author Lei Chen
|
||||
* @author Eduard Domínguez
|
||||
*/
|
||||
public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
|
||||
@@ -96,14 +98,14 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
KafkaStreamsConsumerProperties extendedConsumerProperties) {
|
||||
String keySerdeString = extendedConsumerProperties.getKeySerde();
|
||||
|
||||
return getKeySerde(keySerdeString);
|
||||
return getKeySerde(keySerdeString, extendedConsumerProperties.getConfiguration());
|
||||
}
|
||||
|
||||
public Serde<?> getInboundKeySerde(
|
||||
KafkaStreamsConsumerProperties extendedConsumerProperties, ResolvableType resolvableType) {
|
||||
String keySerdeString = extendedConsumerProperties.getKeySerde();
|
||||
|
||||
return getKeySerde(keySerdeString, resolvableType);
|
||||
return getKeySerde(keySerdeString, resolvableType, extendedConsumerProperties.getConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -120,7 +122,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
String valueSerdeString = extendedConsumerProperties.getValueSerde();
|
||||
try {
|
||||
if (consumerProperties != null && consumerProperties.isUseNativeDecoding()) {
|
||||
valueSerde = getValueSerde(valueSerdeString);
|
||||
valueSerde = getValueSerde(valueSerdeString, extendedConsumerProperties.getConfiguration());
|
||||
}
|
||||
else {
|
||||
valueSerde = Serdes.ByteArray();
|
||||
@@ -140,7 +142,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
String valueSerdeString = extendedConsumerProperties.getValueSerde();
|
||||
try {
|
||||
if (consumerProperties != null && consumerProperties.isUseNativeDecoding()) {
|
||||
valueSerde = getValueSerde(valueSerdeString, resolvableType);
|
||||
valueSerde = getValueSerde(valueSerdeString, resolvableType, extendedConsumerProperties.getConfiguration());
|
||||
}
|
||||
else {
|
||||
valueSerde = Serdes.ByteArray();
|
||||
@@ -158,11 +160,11 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
* @return configurd {@link Serde} for the outbound key.
|
||||
*/
|
||||
public Serde<?> getOuboundKeySerde(KafkaStreamsProducerProperties properties) {
|
||||
return getKeySerde(properties.getKeySerde());
|
||||
return getKeySerde(properties.getKeySerde(), properties.getConfiguration());
|
||||
}
|
||||
|
||||
public Serde<?> getOuboundKeySerde(KafkaStreamsProducerProperties properties, ResolvableType resolvableType) {
|
||||
return getKeySerde(properties.getKeySerde(), resolvableType);
|
||||
return getKeySerde(properties.getKeySerde(), resolvableType, properties.getConfiguration());
|
||||
}
|
||||
|
||||
|
||||
@@ -179,7 +181,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
try {
|
||||
if (producerProperties.isUseNativeEncoding()) {
|
||||
valueSerde = getValueSerde(
|
||||
kafkaStreamsProducerProperties.getValueSerde());
|
||||
kafkaStreamsProducerProperties.getValueSerde(), kafkaStreamsProducerProperties.getConfiguration());
|
||||
}
|
||||
else {
|
||||
valueSerde = Serdes.ByteArray();
|
||||
@@ -197,7 +199,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
try {
|
||||
if (producerProperties.isUseNativeEncoding()) {
|
||||
valueSerde = getValueSerde(
|
||||
kafkaStreamsProducerProperties.getValueSerde(), resolvableType);
|
||||
kafkaStreamsProducerProperties.getValueSerde(), resolvableType, kafkaStreamsProducerProperties.getConfiguration());
|
||||
}
|
||||
else {
|
||||
valueSerde = Serdes.ByteArray();
|
||||
@@ -215,7 +217,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
* @return {@link Serde} for the state store key.
|
||||
*/
|
||||
public Serde<?> getStateStoreKeySerde(String keySerdeString) {
|
||||
return getKeySerde(keySerdeString);
|
||||
return getKeySerde(keySerdeString, (Map<String, ?>) null);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -225,14 +227,14 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
*/
|
||||
public Serde<?> getStateStoreValueSerde(String valueSerdeString) {
|
||||
try {
|
||||
return getValueSerde(valueSerdeString);
|
||||
return getValueSerde(valueSerdeString, (Map<String, ?>) null);
|
||||
}
|
||||
catch (ClassNotFoundException ex) {
|
||||
throw new IllegalStateException("Serde class not found: ", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private Serde<?> getKeySerde(String keySerdeString) {
|
||||
private Serde<?> getKeySerde(String keySerdeString, Map<String, ?> extendedConfiguration) {
|
||||
Serde<?> keySerde;
|
||||
try {
|
||||
if (StringUtils.hasText(keySerdeString)) {
|
||||
@@ -241,8 +243,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
else {
|
||||
keySerde = getFallbackSerde("default.key.serde");
|
||||
}
|
||||
keySerde.configure(this.streamConfigGlobalProperties, true);
|
||||
|
||||
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
|
||||
}
|
||||
catch (ClassNotFoundException ex) {
|
||||
throw new IllegalStateException("Serde class not found: ", ex);
|
||||
@@ -250,7 +251,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
return keySerde;
|
||||
}
|
||||
|
||||
private Serde<?> getKeySerde(String keySerdeString, ResolvableType resolvableType) {
|
||||
private Serde<?> getKeySerde(String keySerdeString, ResolvableType resolvableType, Map<String, ?> extendedConfiguration) {
|
||||
Serde<?> keySerde = null;
|
||||
try {
|
||||
if (StringUtils.hasText(keySerdeString)) {
|
||||
@@ -267,7 +268,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
keySerde = Serdes.ByteArray();
|
||||
}
|
||||
}
|
||||
keySerde.configure(this.streamConfigGlobalProperties, true);
|
||||
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
|
||||
}
|
||||
catch (ClassNotFoundException ex) {
|
||||
throw new IllegalStateException("Serde class not found: ", ex);
|
||||
@@ -380,7 +381,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
}
|
||||
|
||||
|
||||
private Serde<?> getValueSerde(String valueSerdeString)
|
||||
private Serde<?> getValueSerde(String valueSerdeString, Map<String, ?> extendedConfiguration)
|
||||
throws ClassNotFoundException {
|
||||
Serde<?> valueSerde;
|
||||
if (StringUtils.hasText(valueSerdeString)) {
|
||||
@@ -389,7 +390,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
else {
|
||||
valueSerde = getFallbackSerde("default.value.serde");
|
||||
}
|
||||
valueSerde.configure(this.streamConfigGlobalProperties, false);
|
||||
valueSerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
|
||||
return valueSerde;
|
||||
}
|
||||
|
||||
@@ -403,7 +404,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Serde<?> getValueSerde(String valueSerdeString, ResolvableType resolvableType)
|
||||
private Serde<?> getValueSerde(String valueSerdeString, ResolvableType resolvableType, Map<String, ?> extendedConfiguration)
|
||||
throws ClassNotFoundException {
|
||||
Serde<?> valueSerde = null;
|
||||
if (StringUtils.hasText(valueSerdeString)) {
|
||||
@@ -422,7 +423,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
valueSerde = Serdes.ByteArray();
|
||||
}
|
||||
}
|
||||
valueSerde.configure(streamConfigGlobalProperties, false);
|
||||
valueSerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
|
||||
return valueSerde;
|
||||
}
|
||||
|
||||
@@ -430,4 +431,15 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
context = (ConfigurableApplicationContext) applicationContext;
|
||||
}
|
||||
|
||||
private Map<String, ?> combineStreamConfigProperties(Map<String, ?> extendedConfiguration) {
|
||||
if (extendedConfiguration != null && !extendedConfiguration.isEmpty()) {
|
||||
Map<String, Object> streamConfiguration = new HashMap(this.streamConfigGlobalProperties);
|
||||
streamConfiguration.putAll(extendedConfiguration);
|
||||
return streamConfiguration;
|
||||
}
|
||||
else {
|
||||
return this.streamConfigGlobalProperties;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,9 @@ import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.fasterxml.jackson.databind.type.TypeFactory;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.common.security.JaasUtils;
|
||||
import org.apache.kafka.streams.kstream.GlobalKTable;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
@@ -28,9 +31,11 @@ import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
@@ -40,6 +45,7 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @author Eduard Domínguez
|
||||
*/
|
||||
public class KafkaStreamsBinderBootstrapTest {
|
||||
|
||||
@@ -111,7 +117,7 @@ public class KafkaStreamsBinderBootstrapTest {
|
||||
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id"
|
||||
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.configuration.spring.json.value.type.method=com.test.MyClass",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.configuration.spring.json.value.type.method=" + this.getClass().getName() + ".determineType",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id"
|
||||
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers="
|
||||
@@ -134,10 +140,19 @@ public class KafkaStreamsBinderBootstrapTest {
|
||||
final StreamsBuilderFactoryBean input3SBFB = applicationContext.getBean("&stream-builder-input3", StreamsBuilderFactoryBean.class);
|
||||
final Properties streamsConfiguration3 = input3SBFB.getStreamsConfiguration();
|
||||
assertThat(streamsConfiguration3.containsKey("spring.json.value.type.method")).isFalse();
|
||||
applicationContext.getBean(KeyValueSerdeResolver.class);
|
||||
|
||||
String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
|
||||
.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2");
|
||||
|
||||
assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver);
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
public static JavaType determineType(byte[] data, Headers headers) {
|
||||
return TypeFactory.defaultInstance().constructParametricType(Map.class, String.class, String.class);
|
||||
}
|
||||
|
||||
@SpringBootApplication
|
||||
static class SimpleKafkaStreamsApplication {
|
||||
|
||||
@@ -149,7 +164,7 @@ public class KafkaStreamsBinderBootstrapTest {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Consumer<KTable<Object, String>> input2() {
|
||||
public Consumer<KTable<Object, Map<String, String>>> input2() {
|
||||
return s -> {
|
||||
// No-op consumer
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user