Kafka Streams binder deprecated component removals
This commit is contained in:
@@ -73,35 +73,6 @@
|
||||
<artifactId>kafka_2.13</artifactId>
|
||||
<classifier>test</classifier>
|
||||
</dependency>
|
||||
<!-- Following dependency is only provided for testing and won't be packaged with the binder apps-->
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
<version>${avro.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro-maven-plugin</artifactId>
|
||||
<version>${avro.version}</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>generate-test-sources</phase>
|
||||
<goals>
|
||||
<goal>schema</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.basedir}/target/generated-test-sources</outputDirectory>
|
||||
<testOutputDirectory>${project.basedir}/target/generated-test-sources</testOutputDirectory>
|
||||
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
||||
@@ -1,66 +0,0 @@
|
||||
/*
|
||||
* Copyright 2017-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
|
||||
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.core.ResolvableType;
|
||||
|
||||
/**
|
||||
* {@link StreamListenerParameterAdapter} for KStream.
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
class KStreamStreamListenerParameterAdapter
|
||||
implements StreamListenerParameterAdapter<KStream<?, ?>, KStream<?, ?>> {
|
||||
|
||||
private final KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate;
|
||||
|
||||
private final KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue;
|
||||
|
||||
KStreamStreamListenerParameterAdapter(
|
||||
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
|
||||
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue) {
|
||||
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
|
||||
this.KafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supports(Class bindingTargetType, MethodParameter methodParameter) {
|
||||
return KafkaStreamsBinderUtils.supportsKStream(methodParameter, bindingTargetType);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public KStream adapt(KStream<?, ?> bindingTarget, MethodParameter parameter) {
|
||||
ResolvableType resolvableType = ResolvableType.forMethodParameter(parameter);
|
||||
final Class<?> valueClass = (resolvableType.getGeneric(1).getRawClass() != null)
|
||||
? (resolvableType.getGeneric(1).getRawClass()) : Object.class;
|
||||
if (this.KafkaStreamsBindingInformationCatalogue
|
||||
.isUseNativeDecoding(bindingTarget)) {
|
||||
return bindingTarget;
|
||||
}
|
||||
else {
|
||||
return this.kafkaStreamsMessageConversionDelegate
|
||||
.deserializeOnInbound(valueClass, bindingTarget);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,58 +0,0 @@
|
||||
/*
|
||||
* Copyright 2017-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
|
||||
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
|
||||
|
||||
/**
|
||||
* {@link StreamListenerResultAdapter} for KStream.
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
class KStreamStreamListenerResultAdapter implements
|
||||
StreamListenerResultAdapter<KStream, KStreamBoundElementFactory.KStreamWrapper> {
|
||||
|
||||
@Override
|
||||
public boolean supports(Class<?> resultType, Class<?> boundElement) {
|
||||
return KStream.class.isAssignableFrom(resultType)
|
||||
&& KStream.class.isAssignableFrom(boundElement);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Closeable adapt(KStream streamListenerResult,
|
||||
KStreamBoundElementFactory.KStreamWrapper boundElement) {
|
||||
boundElement.wrap(streamListenerResult);
|
||||
return new NoOpCloseable();
|
||||
}
|
||||
|
||||
private static final class NoOpCloseable implements Closeable {
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -17,7 +17,6 @@
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -50,10 +49,7 @@ import org.springframework.cloud.stream.binder.BinderConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.function.FunctionDetectorCondition;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.serde.CompositeNonNativeSerde;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.serde.MessageConverterDelegateSerde;
|
||||
import org.springframework.cloud.stream.binding.BindingService;
|
||||
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
|
||||
import org.springframework.cloud.stream.config.BinderProperties;
|
||||
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
|
||||
import org.springframework.cloud.stream.config.BindingServiceProperties;
|
||||
@@ -296,37 +292,6 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KStreamStreamListenerResultAdapter kstreamStreamListenerResultAdapter() {
|
||||
return new KStreamStreamListenerResultAdapter();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KStreamStreamListenerParameterAdapter kstreamStreamListenerParameterAdapter(
|
||||
KafkaStreamsMessageConversionDelegate kstreamBoundMessageConversionDelegate,
|
||||
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue) {
|
||||
return new KStreamStreamListenerParameterAdapter(
|
||||
kstreamBoundMessageConversionDelegate,
|
||||
KafkaStreamsBindingInformationCatalogue);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaStreamsStreamListenerSetupMethodOrchestrator kafkaStreamsStreamListenerSetupMethodOrchestrator(
|
||||
BindingServiceProperties bindingServiceProperties,
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
|
||||
KeyValueSerdeResolver keyValueSerdeResolver,
|
||||
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
|
||||
KStreamStreamListenerParameterAdapter kafkaStreamListenerParameterAdapter,
|
||||
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
|
||||
ObjectProvider<CleanupConfig> cleanupConfig,
|
||||
ObjectProvider<StreamsBuilderFactoryBeanConfigurer> customizerProvider, ConfigurableEnvironment environment) {
|
||||
return new KafkaStreamsStreamListenerSetupMethodOrchestrator(
|
||||
bindingServiceProperties, kafkaStreamsExtendedBindingProperties,
|
||||
keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue,
|
||||
kafkaStreamListenerParameterAdapter, streamListenerResultAdapters,
|
||||
cleanupConfig.getIfUnique(), customizerProvider.getIfUnique(), environment);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaStreamsMessageConversionDelegate messageConversionDelegate(
|
||||
@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
|
||||
@@ -338,20 +303,6 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
KafkaStreamsBindingInformationCatalogue, binderConfigurationProperties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MessageConverterDelegateSerde messageConverterDelegateSerde(
|
||||
@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
|
||||
CompositeMessageConverter compositeMessageConverterFactory) {
|
||||
return new MessageConverterDelegateSerde(compositeMessageConverterFactory);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CompositeNonNativeSerde compositeNonNativeSerde(
|
||||
@Qualifier(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
|
||||
CompositeMessageConverter compositeMessageConverterFactory) {
|
||||
return new CompositeNonNativeSerde(compositeMessageConverterFactory);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KStreamBoundElementFactory kStreamBoundElementFactory(
|
||||
BindingServiceProperties bindingServiceProperties,
|
||||
|
||||
@@ -1,521 +0,0 @@
|
||||
/*
|
||||
* Copyright 2018-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.Topology;
|
||||
import org.apache.kafka.streams.kstream.GlobalKTable;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
import org.apache.kafka.streams.state.StoreBuilder;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
|
||||
import org.springframework.beans.factory.BeanInitializationException;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsStateStore;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsStateStoreProperties;
|
||||
import org.springframework.cloud.stream.binding.StreamListenerErrorMessages;
|
||||
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
|
||||
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
|
||||
import org.springframework.cloud.stream.binding.StreamListenerSetupMethodOrchestrator;
|
||||
import org.springframework.cloud.stream.config.BindingProperties;
|
||||
import org.springframework.cloud.stream.config.BindingServiceProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.core.annotation.AnnotationUtils;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
|
||||
import org.springframework.kafka.core.CleanupConfig;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Kafka Streams specific implementation for {@link StreamListenerSetupMethodOrchestrator}
|
||||
* that overrides the default mechanisms for invoking StreamListener adapters.
|
||||
* <p>
|
||||
* The orchestration primarily focus on the following areas:
|
||||
* <p>
|
||||
* 1. Allow multiple KStream output bindings (KStream branching) by allowing more than one
|
||||
* output values on {@link SendTo} 2. Allow multiple inbound bindings for multiple KStream
|
||||
* and or KTable/GlobalKTable types. 3. Each StreamListener method that it orchestrates
|
||||
* gets its own {@link StreamsBuilderFactoryBean} and {@link StreamsConfig}
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @author Lei Chen
|
||||
* @author Gary Russell
|
||||
*/
|
||||
class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStreamsBinderProcessor
|
||||
implements StreamListenerSetupMethodOrchestrator {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(KafkaStreamsStreamListenerSetupMethodOrchestrator.class);
|
||||
|
||||
private final StreamListenerParameterAdapter streamListenerParameterAdapter;
|
||||
|
||||
private final Collection<StreamListenerResultAdapter> streamListenerResultAdapters;
|
||||
|
||||
private final BindingServiceProperties bindingServiceProperties;
|
||||
|
||||
private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
|
||||
|
||||
private final KeyValueSerdeResolver keyValueSerdeResolver;
|
||||
|
||||
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
|
||||
|
||||
private final Map<Method, List<String>> registeredStoresPerMethod = new HashMap<>();
|
||||
|
||||
private final Map<Method, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap = new HashMap<>();
|
||||
|
||||
StreamsBuilderFactoryBeanConfigurer customizer;
|
||||
|
||||
private final ConfigurableEnvironment environment;
|
||||
|
||||
KafkaStreamsStreamListenerSetupMethodOrchestrator(
|
||||
BindingServiceProperties bindingServiceProperties,
|
||||
KafkaStreamsExtendedBindingProperties extendedBindingProperties,
|
||||
KeyValueSerdeResolver keyValueSerdeResolver,
|
||||
KafkaStreamsBindingInformationCatalogue bindingInformationCatalogue,
|
||||
StreamListenerParameterAdapter streamListenerParameterAdapter,
|
||||
Collection<StreamListenerResultAdapter> listenerResultAdapters,
|
||||
CleanupConfig cleanupConfig,
|
||||
StreamsBuilderFactoryBeanConfigurer customizer,
|
||||
ConfigurableEnvironment environment) {
|
||||
super(bindingServiceProperties, bindingInformationCatalogue, extendedBindingProperties, keyValueSerdeResolver, cleanupConfig);
|
||||
this.bindingServiceProperties = bindingServiceProperties;
|
||||
this.kafkaStreamsExtendedBindingProperties = extendedBindingProperties;
|
||||
this.keyValueSerdeResolver = keyValueSerdeResolver;
|
||||
this.kafkaStreamsBindingInformationCatalogue = bindingInformationCatalogue;
|
||||
this.streamListenerParameterAdapter = streamListenerParameterAdapter;
|
||||
this.streamListenerResultAdapters = listenerResultAdapters;
|
||||
this.customizer = customizer;
|
||||
this.environment = environment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supports(Method method) {
|
||||
return methodParameterSupports(method) && (methodReturnTypeSuppports(method)
|
||||
|| Void.TYPE.equals(method.getReturnType()));
|
||||
}
|
||||
|
||||
private boolean methodReturnTypeSuppports(Method method) {
|
||||
Class<?> returnType = method.getReturnType();
|
||||
if (returnType.equals(KStream.class) || (returnType.isArray()
|
||||
&& returnType.getComponentType().equals(KStream.class))) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean methodParameterSupports(Method method) {
|
||||
boolean supports = false;
|
||||
for (int i = 0; i < method.getParameterCount(); i++) {
|
||||
MethodParameter methodParameter = MethodParameter.forExecutable(method, i);
|
||||
Class<?> parameterType = methodParameter.getParameterType();
|
||||
if (parameterType.equals(KStream.class) || parameterType.equals(KTable.class)
|
||||
|| parameterType.equals(GlobalKTable.class)) {
|
||||
supports = true;
|
||||
}
|
||||
}
|
||||
return supports;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public void orchestrateStreamListenerSetupMethod(StreamListener streamListener,
|
||||
Method method, Object bean) {
|
||||
String[] methodAnnotatedOutboundNames = getOutboundBindingTargetNames(method);
|
||||
validateStreamListenerMethod(streamListener, method,
|
||||
methodAnnotatedOutboundNames);
|
||||
String methodAnnotatedInboundName = streamListener.value();
|
||||
Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments(method,
|
||||
methodAnnotatedInboundName, this.applicationContext,
|
||||
this.streamListenerParameterAdapter);
|
||||
try {
|
||||
ReflectionUtils.makeAccessible(method);
|
||||
if (Void.TYPE.equals(method.getReturnType())) {
|
||||
method.invoke(bean, adaptedInboundArguments);
|
||||
}
|
||||
else {
|
||||
Object result = method.invoke(bean, adaptedInboundArguments);
|
||||
|
||||
if (methodAnnotatedOutboundNames != null && methodAnnotatedOutboundNames.length > 0) {
|
||||
if (result.getClass().isArray()) {
|
||||
Assert.isTrue(
|
||||
methodAnnotatedOutboundNames.length == ((Object[]) result).length,
|
||||
"Result does not match with the number of declared outbounds");
|
||||
}
|
||||
else {
|
||||
Assert.isTrue(methodAnnotatedOutboundNames.length == 1,
|
||||
"Result does not match with the number of declared outbounds");
|
||||
}
|
||||
}
|
||||
|
||||
if (methodAnnotatedOutboundNames != null && methodAnnotatedOutboundNames.length > 0) {
|
||||
methodAnnotatedInboundName = populateInboundIfMissing(method, methodAnnotatedInboundName);
|
||||
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsBindingInformationCatalogue
|
||||
.getStreamsBuilderFactoryBeanPerBinding().get(methodAnnotatedInboundName);
|
||||
|
||||
if (result.getClass().isArray()) {
|
||||
Object[] outboundKStreams = (Object[]) result;
|
||||
int i = 0;
|
||||
for (Object outboundKStream : outboundKStreams) {
|
||||
final String methodAnnotatedOutboundName = methodAnnotatedOutboundNames[i++];
|
||||
|
||||
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(
|
||||
methodAnnotatedOutboundName, streamsBuilderFactoryBean);
|
||||
|
||||
Object targetBean = this.applicationContext
|
||||
.getBean(methodAnnotatedOutboundName);
|
||||
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean, ResolvableType.forMethodReturnType(method));
|
||||
adaptStreamListenerResult(outboundKStream, targetBean);
|
||||
}
|
||||
}
|
||||
else {
|
||||
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(
|
||||
methodAnnotatedOutboundNames[0], streamsBuilderFactoryBean);
|
||||
|
||||
Object targetBean = this.applicationContext
|
||||
.getBean(methodAnnotatedOutboundNames[0]);
|
||||
kafkaStreamsBindingInformationCatalogue.addOutboundKStreamResolvable(targetBean, ResolvableType.forMethodReturnType(method));
|
||||
adaptStreamListenerResult(result, targetBean);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
throw new BeanInitializationException(
|
||||
"Cannot setup StreamListener for " + method, ex);
|
||||
}
|
||||
}
|
||||
|
||||
private String populateInboundIfMissing(Method method, String methodAnnotatedInboundName) {
|
||||
if (!StringUtils.hasText(methodAnnotatedInboundName)) {
|
||||
Object[] arguments = new Object[method.getParameterTypes().length];
|
||||
if (arguments.length > 0) {
|
||||
MethodParameter methodParameter = MethodParameter.forExecutable(method, 0);
|
||||
if (methodParameter.hasParameterAnnotation(Input.class)) {
|
||||
Input methodAnnotation = methodParameter
|
||||
.getParameterAnnotation(Input.class);
|
||||
methodAnnotatedInboundName = methodAnnotation.value();
|
||||
}
|
||||
}
|
||||
}
|
||||
return methodAnnotatedInboundName;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void adaptStreamListenerResult(Object outboundKStream, Object targetBean) {
|
||||
for (StreamListenerResultAdapter streamListenerResultAdapter : this.streamListenerResultAdapters) {
|
||||
if (streamListenerResultAdapter.supports(
|
||||
outboundKStream.getClass(), targetBean.getClass())) {
|
||||
streamListenerResultAdapter.adapt(outboundKStream,
|
||||
targetBean);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings({"unchecked"})
|
||||
public Object[] adaptAndRetrieveInboundArguments(Method method, String inboundName,
|
||||
ApplicationContext applicationContext,
|
||||
StreamListenerParameterAdapter... adapters) {
|
||||
Object[] arguments = new Object[method.getParameterTypes().length];
|
||||
for (int parameterIndex = 0; parameterIndex < arguments.length; parameterIndex++) {
|
||||
MethodParameter methodParameter = MethodParameter.forExecutable(method,
|
||||
parameterIndex);
|
||||
Class<?> parameterType = methodParameter.getParameterType();
|
||||
Object targetReferenceValue = null;
|
||||
if (methodParameter.hasParameterAnnotation(Input.class)) {
|
||||
targetReferenceValue = AnnotationUtils
|
||||
.getValue(methodParameter.getParameterAnnotation(Input.class));
|
||||
Input methodAnnotation = methodParameter
|
||||
.getParameterAnnotation(Input.class);
|
||||
inboundName = methodAnnotation.value();
|
||||
}
|
||||
else if (arguments.length == 1 && StringUtils.hasText(inboundName)) {
|
||||
targetReferenceValue = inboundName;
|
||||
}
|
||||
if (targetReferenceValue != null) {
|
||||
Assert.isInstanceOf(String.class, targetReferenceValue,
|
||||
"Annotation value must be a String");
|
||||
Object targetBean = applicationContext
|
||||
.getBean((String) targetReferenceValue);
|
||||
BindingProperties bindingProperties = this.bindingServiceProperties
|
||||
.getBindingProperties(inboundName);
|
||||
// Retrieve the StreamsConfig created for this method if available.
|
||||
// Otherwise, create the StreamsBuilderFactory and get the underlying
|
||||
// config.
|
||||
if (!this.methodStreamsBuilderFactoryBeanMap.containsKey(method)) {
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = buildStreamsBuilderAndRetrieveConfig(method.getDeclaringClass().getSimpleName() + "-" + method.getName(),
|
||||
applicationContext,
|
||||
inboundName, null, customizer, this.environment, bindingProperties);
|
||||
this.methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderFactoryBean);
|
||||
}
|
||||
try {
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.methodStreamsBuilderFactoryBeanMap
|
||||
.get(method);
|
||||
StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
|
||||
final String applicationId = streamsBuilderFactoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
|
||||
KafkaStreamsConsumerProperties extendedConsumerProperties = this.kafkaStreamsExtendedBindingProperties
|
||||
.getExtendedConsumerProperties(inboundName);
|
||||
extendedConsumerProperties.setApplicationId(applicationId);
|
||||
// get state store spec
|
||||
KafkaStreamsStateStoreProperties spec = buildStateStoreSpec(method);
|
||||
|
||||
Serde<?> keySerde = this.keyValueSerdeResolver
|
||||
.getInboundKeySerde(extendedConsumerProperties, ResolvableType.forMethodParameter(methodParameter));
|
||||
LOG.info("Key Serde used for " + targetReferenceValue + ": " + keySerde.getClass().getName());
|
||||
|
||||
Serde<?> valueSerde = bindingServiceProperties.getConsumerProperties(inboundName).isUseNativeDecoding() ?
|
||||
getValueSerde(inboundName, extendedConsumerProperties, ResolvableType.forMethodParameter(methodParameter)) : Serdes.ByteArray();
|
||||
LOG.info("Value Serde used for " + targetReferenceValue + ": " + valueSerde.getClass().getName());
|
||||
|
||||
Topology.AutoOffsetReset autoOffsetReset = getAutoOffsetReset(inboundName, extendedConsumerProperties);
|
||||
|
||||
if (parameterType.isAssignableFrom(KStream.class)) {
|
||||
KStream<?, ?> stream = getkStream(inboundName, spec,
|
||||
bindingProperties, extendedConsumerProperties, streamsBuilder, keySerde, valueSerde,
|
||||
autoOffsetReset, parameterIndex == 0);
|
||||
KStreamBoundElementFactory.KStreamWrapper kStreamWrapper = (KStreamBoundElementFactory.KStreamWrapper) targetBean;
|
||||
// wrap the proxy created during the initial target type binding
|
||||
// with real object (KStream)
|
||||
kStreamWrapper.wrap((KStream<Object, Object>) stream);
|
||||
this.kafkaStreamsBindingInformationCatalogue.addKeySerde(stream, keySerde);
|
||||
BindingProperties bindingProperties1 = this.kafkaStreamsBindingInformationCatalogue.getBindingProperties().get(kStreamWrapper);
|
||||
this.kafkaStreamsBindingInformationCatalogue.registerBindingProperties(stream, bindingProperties1);
|
||||
|
||||
this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(inboundName, streamsBuilderFactoryBean);
|
||||
this.kafkaStreamsBindingInformationCatalogue.addConsumerPropertiesPerSbfb(streamsBuilderFactoryBean,
|
||||
bindingServiceProperties.getConsumerProperties(inboundName));
|
||||
|
||||
for (StreamListenerParameterAdapter streamListenerParameterAdapter : adapters) {
|
||||
if (streamListenerParameterAdapter.supports(stream.getClass(),
|
||||
methodParameter)) {
|
||||
arguments[parameterIndex] = streamListenerParameterAdapter
|
||||
.adapt(stream, methodParameter);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (arguments[parameterIndex] == null
|
||||
&& parameterType.isAssignableFrom(stream.getClass())) {
|
||||
arguments[parameterIndex] = stream;
|
||||
}
|
||||
Assert.notNull(arguments[parameterIndex],
|
||||
"Cannot convert argument " + parameterIndex + " of "
|
||||
+ method + "from " + stream.getClass() + " to "
|
||||
+ parameterType);
|
||||
}
|
||||
else {
|
||||
handleKTableGlobalKTableInputs(arguments, parameterIndex, inboundName, parameterType, targetBean, streamsBuilderFactoryBean,
|
||||
streamsBuilder, extendedConsumerProperties, keySerde, valueSerde, autoOffsetReset, parameterIndex == 0);
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException(
|
||||
StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
|
||||
}
|
||||
}
|
||||
return arguments;
|
||||
}
|
||||
|
||||
private StoreBuilder buildStateStore(KafkaStreamsStateStoreProperties spec) {
|
||||
try {
|
||||
|
||||
Serde<?> keySerde = this.keyValueSerdeResolver
|
||||
.getStateStoreKeySerde(spec.getKeySerdeString());
|
||||
Serde<?> valueSerde = this.keyValueSerdeResolver
|
||||
.getStateStoreValueSerde(spec.getValueSerdeString());
|
||||
StoreBuilder builder;
|
||||
switch (spec.getType()) {
|
||||
case KEYVALUE:
|
||||
builder = Stores.keyValueStoreBuilder(
|
||||
Stores.persistentKeyValueStore(spec.getName()), keySerde,
|
||||
valueSerde);
|
||||
break;
|
||||
case WINDOW:
|
||||
builder = Stores
|
||||
.windowStoreBuilder(
|
||||
Stores.persistentWindowStore(spec.getName(),
|
||||
Duration.ofMillis(spec.getRetention()), Duration.ofMillis(3), false),
|
||||
keySerde, valueSerde);
|
||||
break;
|
||||
case SESSION:
|
||||
builder = Stores.sessionStoreBuilder(Stores.persistentSessionStore(
|
||||
spec.getName(), Duration.ofMillis(spec.getRetention())), keySerde, valueSerde);
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException(
|
||||
"state store type (" + spec.getType() + ") is not supported!");
|
||||
}
|
||||
if (spec.isCacheEnabled()) {
|
||||
builder = builder.withCachingEnabled();
|
||||
}
|
||||
if (spec.isLoggingDisabled()) {
|
||||
builder = builder.withLoggingDisabled();
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
catch (Exception ex) {
|
||||
LOG.error("failed to build state store exception : " + ex);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
private KStream<?, ?> getkStream(String inboundName,
|
||||
KafkaStreamsStateStoreProperties storeSpec,
|
||||
BindingProperties bindingProperties,
|
||||
KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, StreamsBuilder streamsBuilder,
|
||||
Serde<?> keySerde, Serde<?> valueSerde,
|
||||
Topology.AutoOffsetReset autoOffsetReset, boolean firstBuild) {
|
||||
if (storeSpec != null) {
|
||||
StoreBuilder storeBuilder = buildStateStore(storeSpec);
|
||||
streamsBuilder.addStateStore(storeBuilder);
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("state store " + storeBuilder.name() + " added to topology");
|
||||
}
|
||||
}
|
||||
return getKStream(inboundName, bindingProperties, kafkaStreamsConsumerProperties, streamsBuilder,
|
||||
keySerde, valueSerde, autoOffsetReset, firstBuild);
|
||||
}
|
||||
|
||||
private void validateStreamListenerMethod(StreamListener streamListener,
|
||||
Method method, String[] methodAnnotatedOutboundNames) {
|
||||
String methodAnnotatedInboundName = streamListener.value();
|
||||
if (methodAnnotatedOutboundNames != null) {
|
||||
for (String s : methodAnnotatedOutboundNames) {
|
||||
if (StringUtils.hasText(s)) {
|
||||
Assert.isTrue(isDeclarativeOutput(method, s),
|
||||
"Method must be declarative");
|
||||
}
|
||||
}
|
||||
}
|
||||
if (StringUtils.hasText(methodAnnotatedInboundName)) {
|
||||
int methodArgumentsLength = method.getParameterTypes().length;
|
||||
|
||||
for (int parameterIndex = 0; parameterIndex < methodArgumentsLength; parameterIndex++) {
|
||||
MethodParameter methodParameter = MethodParameter.forExecutable(method,
|
||||
parameterIndex);
|
||||
Assert.isTrue(
|
||||
isDeclarativeInput(methodAnnotatedInboundName, methodParameter),
|
||||
"Method must be declarative");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private boolean isDeclarativeOutput(Method m, String targetBeanName) {
|
||||
boolean declarative;
|
||||
Class<?> returnType = m.getReturnType();
|
||||
if (returnType.isArray()) {
|
||||
Class<?> targetBeanClass = this.applicationContext.getType(targetBeanName);
|
||||
declarative = this.streamListenerResultAdapters.stream()
|
||||
.anyMatch((slpa) -> slpa.supports(returnType.getComponentType(),
|
||||
targetBeanClass));
|
||||
return declarative;
|
||||
}
|
||||
Class<?> targetBeanClass = this.applicationContext.getType(targetBeanName);
|
||||
declarative = this.streamListenerResultAdapters.stream()
|
||||
.anyMatch((slpa) -> slpa.supports(returnType, targetBeanClass));
|
||||
return declarative;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private boolean isDeclarativeInput(String targetBeanName,
|
||||
MethodParameter methodParameter) {
|
||||
if (!methodParameter.getParameterType().isAssignableFrom(Object.class)
|
||||
&& this.applicationContext.containsBean(targetBeanName)) {
|
||||
Class<?> targetBeanClass = this.applicationContext.getType(targetBeanName);
|
||||
if (targetBeanClass != null) {
|
||||
boolean supports = KafkaStreamsBinderUtils.supportsKStream(methodParameter, targetBeanClass);
|
||||
if (!supports) {
|
||||
supports = KTable.class.isAssignableFrom(targetBeanClass)
|
||||
&& KTable.class.isAssignableFrom(methodParameter.getParameterType());
|
||||
if (!supports) {
|
||||
supports = GlobalKTable.class.isAssignableFrom(targetBeanClass)
|
||||
&& GlobalKTable.class.isAssignableFrom(methodParameter.getParameterType());
|
||||
}
|
||||
}
|
||||
return supports;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static String[] getOutboundBindingTargetNames(Method method) {
|
||||
SendTo sendTo = AnnotationUtils.findAnnotation(method, SendTo.class);
|
||||
if (sendTo != null) {
|
||||
Assert.isTrue(!ObjectUtils.isEmpty(sendTo.value()),
|
||||
StreamListenerErrorMessages.ATLEAST_ONE_OUTPUT);
|
||||
Assert.isTrue(sendTo.value().length >= 1,
|
||||
"At least one outbound destination need to be provided.");
|
||||
return sendTo.value();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private KafkaStreamsStateStoreProperties buildStateStoreSpec(Method method) {
|
||||
if (!this.registeredStoresPerMethod.containsKey(method)) {
|
||||
KafkaStreamsStateStore spec = AnnotationUtils.findAnnotation(method,
|
||||
KafkaStreamsStateStore.class);
|
||||
if (spec != null) {
|
||||
Assert.isTrue(!ObjectUtils.isEmpty(spec.name()), "name cannot be empty");
|
||||
Assert.isTrue(spec.name().length() >= 1, "name cannot be empty.");
|
||||
this.registeredStoresPerMethod.put(method, new ArrayList<>());
|
||||
this.registeredStoresPerMethod.get(method).add(spec.name());
|
||||
KafkaStreamsStateStoreProperties props = new KafkaStreamsStateStoreProperties();
|
||||
props.setName(spec.name());
|
||||
props.setType(spec.type());
|
||||
props.setLength(spec.lengthMs());
|
||||
props.setKeySerdeString(spec.keySerde());
|
||||
props.setRetention(spec.retentionMs());
|
||||
props.setValueSerdeString(spec.valueSerde());
|
||||
props.setCacheEnabled(spec.cache());
|
||||
props.setLoggingDisabled(!spec.logging());
|
||||
return props;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,90 +0,0 @@
|
||||
/*
|
||||
* Copyright 2017-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.annotations;
|
||||
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.Output;
|
||||
|
||||
/**
|
||||
* Bindable interface for {@link KStream} input and output.
|
||||
*
|
||||
* This interface can be used as a bindable interface with
|
||||
* {@link org.springframework.cloud.stream.annotation.EnableBinding} when both input and
|
||||
* output types are single KStream. In other scenarios where multiple types are required,
|
||||
* other similar bindable interfaces can be created and used. For example, there are cases
|
||||
* in which multiple KStreams are required on the outbound in the case of KStream
|
||||
* branching or multiple input types are required either in the form of multiple KStreams
|
||||
* and a combination of KStreams and KTables. In those cases, new bindable interfaces
|
||||
* compatible with the requirements must be created. Here are some examples.
|
||||
*
|
||||
* <pre class="code">
|
||||
* interface KStreamBranchProcessor {
|
||||
* @Input("input")
|
||||
* KStream<?, ?> input();
|
||||
*
|
||||
* @Output("output-1")
|
||||
* KStream<?, ?> output1();
|
||||
*
|
||||
* @Output("output-2")
|
||||
* KStream<?, ?> output2();
|
||||
*
|
||||
* @Output("output-3")
|
||||
* KStream<?, ?> output3();
|
||||
*
|
||||
* ......
|
||||
*
|
||||
* }
|
||||
*</pre>
|
||||
*
|
||||
* <pre class="code">
|
||||
* interface KStreamKtableProcessor {
|
||||
* @Input("input-1")
|
||||
* KStream<?, ?> input1();
|
||||
*
|
||||
* @Input("input-2")
|
||||
* KTable<?, ?> input2();
|
||||
*
|
||||
* @Output("output")
|
||||
* KStream<?, ?> output();
|
||||
*
|
||||
* ......
|
||||
*
|
||||
* }
|
||||
*</pre>
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public interface KafkaStreamsProcessor {
|
||||
|
||||
/**
|
||||
* Input binding.
|
||||
* @return {@link Input} binding for {@link KStream} type.
|
||||
*/
|
||||
@Input("input")
|
||||
KStream<?, ?> input();
|
||||
|
||||
/**
|
||||
* Output binding.
|
||||
* @return {@link Output} binding for {@link KStream} type.
|
||||
*/
|
||||
@Output("output")
|
||||
KStream<?, ?> output();
|
||||
|
||||
}
|
||||
@@ -1,115 +0,0 @@
|
||||
/*
|
||||
* Copyright 2018-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.annotations;
|
||||
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsStateStoreProperties;
|
||||
|
||||
/**
|
||||
* Interface for Kafka Stream state store.
|
||||
*
|
||||
* This interface can be used to inject a state store specification into KStream building
|
||||
* process so that the desired store can be built by StreamBuilder and added to topology
|
||||
* for later use by processors. This is particularly useful when need to combine stream
|
||||
* DSL with low level processor APIs. In those cases, if a writable state store is desired
|
||||
* in processors, it needs to be created using this annotation. Here is the example.
|
||||
*
|
||||
* <pre class="code">
|
||||
* @StreamListener("input")
|
||||
* @KafkaStreamsStateStore(name="mystate", type= KafkaStreamsStateStoreProperties.StoreType.WINDOW,
|
||||
* size=300000)
|
||||
* public void process(KStream<Object, Product> input) {
|
||||
* ......
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* With that, you should be able to read/write this state store in your
|
||||
* processor/transformer code.
|
||||
*
|
||||
* <pre class="code">
|
||||
* new Processor<Object, Product>() {
|
||||
* WindowStore<Object, String> state;
|
||||
* @Override
|
||||
* public void init(ProcessorContext processorContext) {
|
||||
* state = (WindowStore)processorContext.getStateStore("mystate");
|
||||
* ......
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* @author Lei Chen
|
||||
*/
|
||||
|
||||
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
|
||||
public @interface KafkaStreamsStateStore {
|
||||
|
||||
/**
|
||||
* Provides name of the state store.
|
||||
* @return name of state store.
|
||||
*/
|
||||
String name() default "";
|
||||
|
||||
/**
|
||||
* State store type.
|
||||
* @return {@link KafkaStreamsStateStoreProperties.StoreType} of state store.
|
||||
*/
|
||||
KafkaStreamsStateStoreProperties.StoreType type() default KafkaStreamsStateStoreProperties.StoreType.KEYVALUE;
|
||||
|
||||
/**
|
||||
* Serde used for key.
|
||||
* @return key serde of state store.
|
||||
*/
|
||||
String keySerde() default "org.apache.kafka.common.serialization.Serdes$StringSerde";
|
||||
|
||||
/**
|
||||
* Serde used for value.
|
||||
* @return value serde of state store.
|
||||
*/
|
||||
String valueSerde() default "org.apache.kafka.common.serialization.Serdes$StringSerde";
|
||||
|
||||
/**
|
||||
* Length in milli-second of Windowed store window.
|
||||
* @return length in milli-second of window(for windowed store).
|
||||
*/
|
||||
long lengthMs() default 0;
|
||||
|
||||
/**
|
||||
* Retention period for Windowed store windows.
|
||||
* @return the maximum period of time in milli-second to keep each window in this
|
||||
* store(for windowed store).
|
||||
*/
|
||||
long retentionMs() default 0;
|
||||
|
||||
/**
|
||||
* Whether catching is enabled or not.
|
||||
* @return whether caching should be enabled on the created store.
|
||||
*/
|
||||
boolean cache() default false;
|
||||
|
||||
/**
|
||||
* Whether logging is enabled or not.
|
||||
* @return whether logging should be enabled on the created store.
|
||||
*/
|
||||
boolean logging() default true;
|
||||
|
||||
}
|
||||
@@ -1,161 +0,0 @@
|
||||
/*
|
||||
* Copyright 2018-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.properties;
|
||||
|
||||
/**
|
||||
* Properties for Kafka Streams state store.
|
||||
*
|
||||
* @author Lei Chen
|
||||
*/
|
||||
public class KafkaStreamsStateStoreProperties {
|
||||
|
||||
/**
|
||||
* Enumeration for store type.
|
||||
*/
|
||||
public enum StoreType {
|
||||
|
||||
/**
|
||||
* Key value store.
|
||||
*/
|
||||
KEYVALUE("keyvalue"),
|
||||
/**
|
||||
* Window store.
|
||||
*/
|
||||
WINDOW("window"),
|
||||
/**
|
||||
* Session store.
|
||||
*/
|
||||
SESSION("session");
|
||||
|
||||
private final String type;
|
||||
|
||||
StoreType(final String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Name for this state store.
|
||||
*/
|
||||
private String name;
|
||||
|
||||
/**
|
||||
* Type for this state store.
|
||||
*/
|
||||
private StoreType type;
|
||||
|
||||
/**
|
||||
* Size/length of this state store in ms. Only applicable for window store.
|
||||
*/
|
||||
private long length;
|
||||
|
||||
/**
|
||||
* Retention period for this state store in ms.
|
||||
*/
|
||||
private long retention;
|
||||
|
||||
/**
|
||||
* Key serde class specified per state store.
|
||||
*/
|
||||
private String keySerdeString;
|
||||
|
||||
/**
|
||||
* Value serde class specified per state store.
|
||||
*/
|
||||
private String valueSerdeString;
|
||||
|
||||
/**
|
||||
* Whether caching is enabled on this state store.
|
||||
*/
|
||||
private boolean cacheEnabled;
|
||||
|
||||
/**
|
||||
* Whether logging is enabled on this state store.
|
||||
*/
|
||||
private boolean loggingDisabled;
|
||||
|
||||
public String getName() {
|
||||
return this.name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public StoreType getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public void setType(StoreType type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public long getLength() {
|
||||
return this.length;
|
||||
}
|
||||
|
||||
public void setLength(long length) {
|
||||
this.length = length;
|
||||
}
|
||||
|
||||
public long getRetention() {
|
||||
return this.retention;
|
||||
}
|
||||
|
||||
public void setRetention(long retention) {
|
||||
this.retention = retention;
|
||||
}
|
||||
|
||||
public String getKeySerdeString() {
|
||||
return this.keySerdeString;
|
||||
}
|
||||
|
||||
public void setKeySerdeString(String keySerdeString) {
|
||||
this.keySerdeString = keySerdeString;
|
||||
}
|
||||
|
||||
public String getValueSerdeString() {
|
||||
return this.valueSerdeString;
|
||||
}
|
||||
|
||||
public void setValueSerdeString(String valueSerdeString) {
|
||||
this.valueSerdeString = valueSerdeString;
|
||||
}
|
||||
|
||||
public boolean isCacheEnabled() {
|
||||
return this.cacheEnabled;
|
||||
}
|
||||
|
||||
public void setCacheEnabled(boolean cacheEnabled) {
|
||||
this.cacheEnabled = cacheEnabled;
|
||||
}
|
||||
|
||||
public boolean isLoggingDisabled() {
|
||||
return this.loggingDisabled;
|
||||
}
|
||||
|
||||
public void setLoggingDisabled(boolean loggingDisabled) {
|
||||
this.loggingDisabled = loggingDisabled;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,37 +0,0 @@
|
||||
/*
|
||||
* Copyright 2018-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.serde;
|
||||
|
||||
import org.springframework.messaging.converter.CompositeMessageConverter;
|
||||
|
||||
/**
|
||||
* This class provides the same functionality as {@link MessageConverterDelegateSerde} and is deprecated.
|
||||
* It is kept for backward compatibility reasons and will be removed in version 3.1
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @since 2.1
|
||||
*
|
||||
* @deprecated in favor of {@link MessageConverterDelegateSerde}
|
||||
*/
|
||||
@Deprecated
|
||||
public class CompositeNonNativeSerde extends MessageConverterDelegateSerde {
|
||||
|
||||
public CompositeNonNativeSerde(CompositeMessageConverter compositeMessageConverter) {
|
||||
super(compositeMessageConverter);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,228 +0,0 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.serde;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.converter.CompositeMessageConverter;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
/**
|
||||
* A {@link Serde} implementation that wraps the list of {@link MessageConverter}s from
|
||||
* {@link CompositeMessageConverter}.
|
||||
*
|
||||
* The primary motivation for this class is to provide an avro based {@link Serde} that is
|
||||
* compatible with the schema registry that Spring Cloud Stream provides. When using the
|
||||
* schema registry support from Spring Cloud Stream in a Kafka Streams binder based
|
||||
* application, the applications can deserialize the incoming Kafka Streams records using
|
||||
* the built in Avro {@link MessageConverter}. However, this same message conversion
|
||||
* approach will not work downstream in other operations in the topology for Kafka Streams
|
||||
* as some of them needs a {@link Serde} instance that can talk to the Spring Cloud Stream
|
||||
* provided Schema Registry. This implementation will solve that problem.
|
||||
*
|
||||
* Only Avro and JSON based converters are exposed as binder provided {@link Serde}
|
||||
* implementations currently.
|
||||
*
|
||||
* Users of this class must call the
|
||||
* {@link MessageConverterDelegateSerde#configure(Map, boolean)} method to configure the
|
||||
* {@link Serde} object. At the very least the configuration map must include a key called
|
||||
* "valueClass" to indicate the type of the target object for deserialization. If any
|
||||
* other content type other than JSON is needed (only Avro is available now other than
|
||||
* JSON), that needs to be included in the configuration map with the key "contentType".
|
||||
* For example,
|
||||
*
|
||||
* <pre class="code">
|
||||
* Map<String, Object> config = new HashMap<>();
|
||||
* config.put("valueClass", Foo.class);
|
||||
* config.put("contentType", "application/avro");
|
||||
* </pre>
|
||||
*
|
||||
* Then use the above map when calling the configure method.
|
||||
*
|
||||
* This class is only intended to be used when writing a Spring Cloud Stream Kafka Streams
|
||||
* application that uses Spring Cloud Stream schema registry for schema evolution.
|
||||
*
|
||||
* An instance of this class is provided as a bean by the binder configuration and
|
||||
* typically the applications can autowire that bean. This is the expected usage pattern
|
||||
* of this class.
|
||||
*
|
||||
* @param <T> type of the object to marshall
|
||||
* @author Soby Chacko
|
||||
* @since 3.0
|
||||
* @deprecated in favor of other schema registry providers instead of Spring Cloud Schema Registry. See its motivation above.
|
||||
*/
|
||||
@Deprecated
|
||||
public class MessageConverterDelegateSerde<T> implements Serde<T> {
|
||||
|
||||
private static final String VALUE_CLASS_HEADER = "valueClass";
|
||||
|
||||
private static final String AVRO_FORMAT = "avro";
|
||||
|
||||
private static final MimeType DEFAULT_AVRO_MIME_TYPE = new MimeType("application",
|
||||
"*+" + AVRO_FORMAT);
|
||||
|
||||
private final MessageConverterDelegateDeserializer<T> messageConverterDelegateDeserializer;
|
||||
|
||||
private final MessageConverterDelegateSerializer<T> messageConverterDelegateSerializer;
|
||||
|
||||
public MessageConverterDelegateSerde(
|
||||
CompositeMessageConverter compositeMessageConverter) {
|
||||
this.messageConverterDelegateDeserializer = new MessageConverterDelegateDeserializer<>(
|
||||
compositeMessageConverter);
|
||||
this.messageConverterDelegateSerializer = new MessageConverterDelegateSerializer<>(
|
||||
compositeMessageConverter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
this.messageConverterDelegateDeserializer.configure(configs, isKey);
|
||||
this.messageConverterDelegateSerializer.configure(configs, isKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// No-op
|
||||
}
|
||||
|
||||
@Override
|
||||
public Serializer<T> serializer() {
|
||||
return this.messageConverterDelegateSerializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Deserializer<T> deserializer() {
|
||||
return this.messageConverterDelegateDeserializer;
|
||||
}
|
||||
|
||||
private static MimeType resolveMimeType(Map<String, ?> configs) {
|
||||
if (configs.containsKey(MessageHeaders.CONTENT_TYPE)) {
|
||||
String contentType = (String) configs.get(MessageHeaders.CONTENT_TYPE);
|
||||
if (DEFAULT_AVRO_MIME_TYPE.equals(MimeTypeUtils.parseMimeType(contentType))) {
|
||||
return DEFAULT_AVRO_MIME_TYPE;
|
||||
}
|
||||
else if (contentType.contains("avro")) {
|
||||
return MimeTypeUtils.parseMimeType("application/avro");
|
||||
}
|
||||
else {
|
||||
return new MimeType("application", "json", StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
else {
|
||||
return new MimeType("application", "json", StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom {@link Deserializer} that uses the {@link org.springframework.cloud.stream.converter.CompositeMessageConverterFactory}.
|
||||
*
|
||||
* @param <U> parameterized target type for deserialization
|
||||
*/
|
||||
private static class MessageConverterDelegateDeserializer<U> implements Deserializer<U> {
|
||||
|
||||
private final MessageConverter messageConverter;
|
||||
|
||||
private MimeType mimeType;
|
||||
|
||||
private Class<?> valueClass;
|
||||
|
||||
MessageConverterDelegateDeserializer(
|
||||
CompositeMessageConverter compositeMessageConverter) {
|
||||
this.messageConverter = compositeMessageConverter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
Assert.isTrue(configs.containsKey(VALUE_CLASS_HEADER),
|
||||
"Deserializers must provide a configuration for valueClass.");
|
||||
final Object valueClass = configs.get(VALUE_CLASS_HEADER);
|
||||
Assert.isTrue(valueClass instanceof Class,
|
||||
"Deserializers must provide a valid value for valueClass.");
|
||||
this.valueClass = (Class<?>) valueClass;
|
||||
this.mimeType = resolveMimeType(configs);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public U deserialize(String topic, byte[] data) {
|
||||
Message<?> message = MessageBuilder.withPayload(data)
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, this.mimeType.toString())
|
||||
.build();
|
||||
U messageConverted = (U) this.messageConverter.fromMessage(message,
|
||||
this.valueClass);
|
||||
Assert.notNull(messageConverted, "Deserialization failed.");
|
||||
return messageConverted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// No-op
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Custom {@link Serializer} that uses the {@link org.springframework.cloud.stream.converter.CompositeMessageConverterFactory}.
|
||||
*
|
||||
* @param <V> parameterized type for serialization
|
||||
*/
|
||||
private static class MessageConverterDelegateSerializer<V> implements Serializer<V> {
|
||||
|
||||
private final MessageConverter messageConverter;
|
||||
|
||||
private MimeType mimeType;
|
||||
|
||||
MessageConverterDelegateSerializer(
|
||||
CompositeMessageConverter compositeMessageConverter) {
|
||||
this.messageConverter = compositeMessageConverter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
this.mimeType = resolveMimeType(configs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] serialize(String topic, V data) {
|
||||
Message<?> message = MessageBuilder.withPayload(data).build();
|
||||
Map<String, Object> headers = new HashMap<>(message.getHeaders());
|
||||
headers.put(MessageHeaders.CONTENT_TYPE, this.mimeType.toString());
|
||||
MessageHeaders messageHeaders = new MessageHeaders(headers);
|
||||
final Object payload = this.messageConverter
|
||||
.toMessage(message.getPayload(), messageHeaders).getPayload();
|
||||
return (byte[]) payload;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// No-op
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
{
|
||||
"namespace" : "com.example",
|
||||
"type" : "record",
|
||||
"name" : "Sensor",
|
||||
"fields" : [
|
||||
{"name":"id","type":"string"},
|
||||
{"name":"temperature", "type":"float", "default":0.0},
|
||||
{"name":"acceleration", "type":"float","default":0.0},
|
||||
{"name":"velocity","type":"float","default":0.0}
|
||||
]
|
||||
}
|
||||
Reference in New Issue
Block a user