Adding state store beans for KTable binding

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/897
This commit is contained in:
Soby Chacko
2020-05-26 12:58:02 -04:00
parent 8be9fe6abc
commit 7adb10bcd2
4 changed files with 51 additions and 13 deletions

View File

@@ -143,7 +143,10 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
protected void handleKTableGlobalKTableInputs(Object[] arguments, int index, String input, Class<?> parameterType, Object targetBean,
StreamsBuilderFactoryBean streamsBuilderFactoryBean, StreamsBuilder streamsBuilder,
KafkaStreamsConsumerProperties extendedConsumerProperties,
Serde<?> keySerde, Serde<?> valueSerde, Topology.AutoOffsetReset autoOffsetReset) {
Serde<?> keySerde, Serde<?> valueSerde, Topology.AutoOffsetReset autoOffsetReset, boolean firstBuild) {
if (firstBuild) {
addStateStoreBeans(streamsBuilder);
}
if (parameterType.isAssignableFrom(KTable.class)) {
String materializedAs = extendedConsumerProperties.getMaterializedAs();
String bindingDestination = this.bindingServiceProperties.getBindingDestination(input);

View File

@@ -350,7 +350,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
}
else {
handleKTableGlobalKTableInputs(arguments, i, input, parameterType, targetBean, streamsBuilderFactoryBean,
streamsBuilder, extendedConsumerProperties, keySerde, valueSerde, autoOffsetReset);
streamsBuilder, extendedConsumerProperties, keySerde, valueSerde, autoOffsetReset, i == 0);
}
i++;
}

View File

@@ -311,7 +311,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
}
else {
handleKTableGlobalKTableInputs(arguments, parameterIndex, inboundName, parameterType, targetBean, streamsBuilderFactoryBean,
streamsBuilder, extendedConsumerProperties, keySerde, valueSerde, autoOffsetReset);
streamsBuilder, extendedConsumerProperties, keySerde, valueSerde, autoOffsetReset, parameterIndex == 0);
}
}
catch (Exception ex) {

View File

@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -59,7 +60,9 @@ public class KafkaStreamsFunctionStateStoreTests {
try (ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.definition=process;hello",
"--spring.cloud.stream.bindings.process-in-0.destination=words",
"--spring.cloud.stream.bindings.hello-in-0.destination=words",
"--spring.cloud.stream.kafka.streams.binder.application-id=testKafkaStreamsFuncionWithMultipleStateStores",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
@@ -77,19 +80,28 @@ public class KafkaStreamsFunctionStateStoreTests {
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault("foobar");
template.sendDefault(1, "foobar");
Thread.sleep(2000L);
StateStoreTestApplication processorApplication = context
.getBean(StateStoreTestApplication.class);
KeyValueStore<Long, Long> state1 = processorApplication.state1;
assertThat(processorApplication.processed).isTrue();
assertThat(processorApplication.processed1).isTrue();
assertThat(state1 != null).isTrue();
assertThat(state1.name()).isEqualTo("my-store");
WindowStore<Long, Long> state2 = processorApplication.state2;
assertThat(state2 != null).isTrue();
assertThat(state2.name()).isEqualTo("other-store");
assertThat(state2.persistent()).isTrue();
KeyValueStore<Long, Long> state3 = processorApplication.state1;
assertThat(processorApplication.processed2).isTrue();
assertThat(state3 != null).isTrue();
assertThat(state3.name()).isEqualTo("my-store");
WindowStore<Long, Long> state4 = processorApplication.state2;
assertThat(state4 != null).isTrue();
assertThat(state4.name()).isEqualTo("other-store");
assertThat(state4.persistent()).isTrue();
}
finally {
pf.destroy();
@@ -102,7 +114,11 @@ public class KafkaStreamsFunctionStateStoreTests {
KeyValueStore<Long, Long> state1;
WindowStore<Long, Long> state2;
boolean processed;
KeyValueStore<Long, Long> state3;
WindowStore<Long, Long> state4;
boolean processed1;
boolean processed2;
@Bean
public java.util.function.BiConsumer<KStream<Object, String>, KStream<Object, String>> process() {
@@ -117,21 +133,40 @@ public class KafkaStreamsFunctionStateStoreTests {
@Override
public void process(Object key, String value) {
processed = true;
processed1 = true;
}
@Override
public void close() {
if (state1 != null) {
state1.close();
}
if (state2 != null) {
state2.close();
}
}
}, "my-store", "other-store");
}
@Bean
public java.util.function.Consumer<KTable<Object, String>> hello() {
return input -> {
input.toStream().process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
state3 = (KeyValueStore<Long, Long>) context.getStateStore("my-store");
state4 = (WindowStore<Long, Long>) context.getStateStore("other-store");
}
@Override
public void process(Object key, String value) {
processed2 = true;
}
@Override
public void close() {
}
}, "my-store", "other-store");
};
}
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(