Kafka streams default single input/output binding

Address an issue in which the binder still default to binding names "input" and "output"
in case of a single function.
This commit is contained in:
Soby Chacko
2019-10-09 23:57:54 -04:00
parent ecc8715b0c
commit 9d708f836a
4 changed files with 17 additions and 47 deletions

View File

@@ -132,15 +132,7 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
}
else {
int numberOfInputs = this.type.getRawClass() != null &&
(this.type.getRawClass().isAssignableFrom(BiFunction.class) ||
this.type.getRawClass().isAssignableFrom(BiConsumer.class)) ? 2 : getNumberOfInputs();
if (this.onlySingleFunction && numberOfInputs == 1) {
outputBinding = "output";
}
else {
outputBinding = String.format("%s-%s-0", this.functionName, FunctionConstants.DEFAULT_OUTPUT_SUFFIX);
}
outputBinding = String.format("%s-%s-0", this.functionName, FunctionConstants.DEFAULT_OUTPUT_SUFFIX);
}
Assert.isTrue(outputBinding != null, "output binding is not inferred.");
KafkaStreamsBindableProxyFactory.this.outputHolders.put(outputBinding,
@@ -176,35 +168,12 @@ public class KafkaStreamsBindableProxyFactory extends AbstractBindableProxyFacto
int numberOfInputs = this.type.getRawClass() != null &&
(this.type.getRawClass().isAssignableFrom(BiFunction.class) ||
this.type.getRawClass().isAssignableFrom(BiConsumer.class)) ? 2 : getNumberOfInputs();
if (numberOfInputs == 1) {
ResolvableType outboundArgument = this.type.getGeneric(1);
while (isAnotherFunctionOrConsumerFound(outboundArgument)) {
//The function is a curried function. We should introspect the partial function chain hierarchy.
outboundArgument = outboundArgument.getGeneric(1);
}
if (this.onlySingleFunction && (outboundArgument == null || outboundArgument.getRawClass() == null)) {
inputs.add("input");
}
else if (this.onlySingleFunction && outboundArgument.getRawClass() != null
&& (!outboundArgument.isArray() &&
outboundArgument.getRawClass().isAssignableFrom(KStream.class))) {
inputs.add("input");
}
else {
inputs.add(String.format("%s-%s-0", this.functionName, FunctionConstants.DEFAULT_INPUT_SUFFIX));
}
return inputs;
}
else {
int i = 0;
while (i < numberOfInputs) {
inputs.add(String.format("%s-%s-%d", this.functionName, FunctionConstants.DEFAULT_INPUT_SUFFIX, i++));
}
return inputs;
int i = 0;
while (i < numberOfInputs) {
inputs.add(String.format("%s-%s-%d", this.functionName, FunctionConstants.DEFAULT_INPUT_SUFFIX, i++));
}
return inputs;
}
private int getNumberOfInputs() {

View File

@@ -85,8 +85,8 @@ public class KafkaStreamsBinderWordCountFunctionTests {
try (ConfigurableApplicationContext context = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=words",
"--spring.cloud.stream.bindings.output.destination=counts",
"--spring.cloud.stream.bindings.process-in-0.destination=words",
"--spring.cloud.stream.bindings.process-out-0.destination=counts",
"--spring.cloud.stream.kafka.streams.default.consumer.application-id=testKstreamWordCountFunction",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
@@ -108,8 +108,8 @@ public class KafkaStreamsBinderWordCountFunctionTests {
try (ConfigurableApplicationContext context = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=words-1",
"--spring.cloud.stream.bindings.output.destination=counts-1",
"--spring.cloud.stream.bindings.process-in-0.destination=words-1",
"--spring.cloud.stream.bindings.process-out-0.destination=counts-1",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
"=org.apache.kafka.common.serialization.Serdes$StringSerde",

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams.function;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes;
@@ -58,7 +59,7 @@ public class KafkaStreamsFunctionStateStoreTests {
try (ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=words",
"--spring.cloud.stream.bindings.process-in-0.destination=words",
"--spring.cloud.stream.kafka.streams.binder.application-id=testKafkaStreamsFuncionWithMultipleStateStores",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
@@ -142,7 +143,7 @@ public class KafkaStreamsFunctionStateStoreTests {
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
3L, 3, 3L, false), Serdes.Long(),
Duration.ofSeconds(3), Duration.ofSeconds(3), false), Serdes.Long(),
Serdes.Long());
}
}

View File

@@ -59,8 +59,8 @@ public class SerdesProvidedAsBeansTests {
try (ConfigurableApplicationContext context = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=purchases",
"--spring.cloud.stream.bindings.output.destination=coffee",
"--spring.cloud.stream.bindings.process-in-0.destination=purchases",
"--spring.cloud.stream.bindings.process-out-0.destination=coffee",
"--spring.cloud.stream.kafka.streams.binder.functions.process.applicationId=process-id-0",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
@@ -77,14 +77,14 @@ public class SerdesProvidedAsBeansTests {
final BindingServiceProperties bindingServiceProperties = context.getBean(BindingServiceProperties.class);
final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = context.getBean(KafkaStreamsExtendedBindingProperties.class);
final ConsumerProperties consumerProperties = bindingServiceProperties.getBindingProperties("input").getConsumer();
final ConsumerProperties consumerProperties = bindingServiceProperties.getBindingProperties("process-in-0").getConsumer();
final KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties = kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties("input");
kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties("input");
final Serde<?> inboundValueSerde = keyValueSerdeResolver.getInboundValueSerde(consumerProperties, kafkaStreamsConsumerProperties, resolvableType.getGeneric(0));
Assert.isTrue(inboundValueSerde instanceof FooSerde, "Inbound Value Serde is not matched");
final ProducerProperties producerProperties = bindingServiceProperties.getBindingProperties("output").getProducer();
final ProducerProperties producerProperties = bindingServiceProperties.getBindingProperties("process-out-0").getProducer();
final KafkaStreamsProducerProperties kafkaStreamsProducerProperties = kafkaStreamsExtendedBindingProperties.getExtendedProducerProperties("output");
kafkaStreamsExtendedBindingProperties.getExtendedProducerProperties("output");
final Serde<?> outboundValueSerde = keyValueSerdeResolver.getOutboundValueSerde(producerProperties, kafkaStreamsProducerProperties, resolvableType.getGeneric(1));