diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java index e581fce3..e2791575 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java @@ -72,7 +72,7 @@ import org.springframework.core.env.MutablePropertySources; import org.springframework.integration.support.utils.IntegrationUtils; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; -import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler; import org.springframework.messaging.MessageHeaders; @@ -181,7 +181,7 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application protected StreamsBuilderFactoryBean buildStreamsBuilderAndRetrieveConfig(String beanNamePostPrefix, ApplicationContext applicationContext, String inboundName, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, - StreamsBuilderFactoryBeanCustomizer customizer, + StreamsBuilderFactoryBeanConfigurer customizer, ConfigurableEnvironment environment, BindingProperties bindingProperties) { ConfigurableListableBeanFactory beanFactory = this.applicationContext .getBeanFactory(); diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java index 9d292848..82c13230 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java @@ -69,7 +69,7 @@ import org.springframework.core.env.MapPropertySource; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.support.utils.IntegrationUtils; import org.springframework.kafka.config.KafkaStreamsConfiguration; -import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.streams.KafkaStreamsMicrometerListener; import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler; @@ -305,7 +305,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration { KStreamStreamListenerParameterAdapter kafkaStreamListenerParameterAdapter, Collection streamListenerResultAdapters, ObjectProvider cleanupConfig, - ObjectProvider customizerProvider, ConfigurableEnvironment environment) { + ObjectProvider customizerProvider, ConfigurableEnvironment environment) { return new KafkaStreamsStreamListenerSetupMethodOrchestrator( bindingServiceProperties, kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, @@ -412,7 +412,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration { ObjectProvider cleanupConfig, StreamFunctionProperties streamFunctionProperties, @Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, - ObjectProvider customizerProvider, ConfigurableEnvironment environment) { + ObjectProvider customizerProvider, ConfigurableEnvironment environment) { return new KafkaStreamsFunctionProcessor(bindingServiceProperties, kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue, kafkaStreamsMessageConversionDelegate, cleanupConfig.getIfUnique(), streamFunctionProperties, kafkaStreamsBinderConfigurationProperties, diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java index afa9ac40..4adc4c34 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java @@ -58,7 +58,7 @@ import org.springframework.cloud.stream.function.StreamFunctionProperties; import org.springframework.core.ResolvableType; import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.kafka.config.StreamsBuilderFactoryBean; -import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; import org.springframework.kafka.core.CleanupConfig; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -82,7 +82,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro private BeanFactory beanFactory; private StreamFunctionProperties streamFunctionProperties; private KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties; - StreamsBuilderFactoryBeanCustomizer customizer; + StreamsBuilderFactoryBeanConfigurer customizer; ConfigurableEnvironment environment; public KafkaStreamsFunctionProcessor(BindingServiceProperties bindingServiceProperties, @@ -93,7 +93,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro CleanupConfig cleanupConfig, StreamFunctionProperties streamFunctionProperties, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, - StreamsBuilderFactoryBeanCustomizer customizer, ConfigurableEnvironment environment) { + StreamsBuilderFactoryBeanConfigurer customizer, ConfigurableEnvironment environment) { super(bindingServiceProperties, kafkaStreamsBindingInformationCatalogue, kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, cleanupConfig); this.bindingServiceProperties = bindingServiceProperties; diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsStreamListenerSetupMethodOrchestrator.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsStreamListenerSetupMethodOrchestrator.java index 6a50b42b..53a3208d 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsStreamListenerSetupMethodOrchestrator.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsStreamListenerSetupMethodOrchestrator.java @@ -55,7 +55,7 @@ import org.springframework.core.ResolvableType; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.kafka.config.StreamsBuilderFactoryBean; -import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; import org.springframework.kafka.core.CleanupConfig; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.util.Assert; @@ -100,7 +100,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr private final Map methodStreamsBuilderFactoryBeanMap = new HashMap<>(); - StreamsBuilderFactoryBeanCustomizer customizer; + StreamsBuilderFactoryBeanConfigurer customizer; private final ConfigurableEnvironment environment; @@ -112,7 +112,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr StreamListenerParameterAdapter streamListenerParameterAdapter, Collection listenerResultAdapters, CleanupConfig cleanupConfig, - StreamsBuilderFactoryBeanCustomizer customizer, + StreamsBuilderFactoryBeanConfigurer customizer, ConfigurableEnvironment environment) { super(bindingServiceProperties, bindingInformationCatalogue, extendedBindingProperties, keyValueSerdeResolver, cleanupConfig); this.bindingServiceProperties = bindingServiceProperties; diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java index ecde035b..327b6b71 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java @@ -58,7 +58,7 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.Lifecycle; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.StreamsBuilderFactoryBean; -import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; @@ -314,7 +314,7 @@ public class KafkaStreamsBinderWordCountFunctionTests { } @Bean - public StreamsBuilderFactoryBeanCustomizer customizer() { + public StreamsBuilderFactoryBeanConfigurer customizer() { return fb -> { try { fb.setStateListener((newState, oldState) -> {