KafkaStreamsBinderBootstrapTest updates
Migrate tests from StreamListener to the functional model.
This commit is contained in:
@@ -16,6 +16,8 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.bootstrap;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
|
||||
import org.apache.kafka.common.security.JaasUtils;
|
||||
@@ -29,10 +31,8 @@ import org.junit.Test;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
@@ -54,26 +54,27 @@ public class KafkaStreamsBinderBootstrapTest {
|
||||
public void testKStreamBinderWithCustomEnvironmentCanStart() {
|
||||
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
|
||||
SimpleKafkaStreamsApplication.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-1.consumer.application-id"
|
||||
"--spring.cloud.function.definition=input1;input2;input3",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id"
|
||||
+ "=testKStreamBinderWithCustomEnvironmentCanStart",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-2.consumer.application-id"
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id"
|
||||
+ "=testKStreamBinderWithCustomEnvironmentCanStart-foo",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-3.consumer.application-id"
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id"
|
||||
+ "=testKStreamBinderWithCustomEnvironmentCanStart-foobar",
|
||||
"--spring.cloud.stream.bindings.input-1.destination=foo",
|
||||
"--spring.cloud.stream.bindings.input-1.binder=kstreamBinder",
|
||||
"--spring.cloud.stream.bindings.input1-in-0.destination=foo",
|
||||
"--spring.cloud.stream.bindings.input1-in-0.binder=kstreamBinder",
|
||||
"--spring.cloud.stream.binders.kstreamBinder.type=kstream",
|
||||
"--spring.cloud.stream.binders.kstreamBinder.environment"
|
||||
+ ".spring.cloud.stream.kafka.streams.binder.brokers"
|
||||
+ "=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString(),
|
||||
"--spring.cloud.stream.bindings.input-2.destination=bar",
|
||||
"--spring.cloud.stream.bindings.input-2.binder=ktableBinder",
|
||||
"--spring.cloud.stream.bindings.input2-in-0.destination=bar",
|
||||
"--spring.cloud.stream.bindings.input2-in-0.binder=ktableBinder",
|
||||
"--spring.cloud.stream.binders.ktableBinder.type=ktable",
|
||||
"--spring.cloud.stream.binders.ktableBinder.environment"
|
||||
+ ".spring.cloud.stream.kafka.streams.binder.brokers"
|
||||
+ "=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString(),
|
||||
"--spring.cloud.stream.bindings.input-3.destination=foobar",
|
||||
"--spring.cloud.stream.bindings.input-3.binder=globalktableBinder",
|
||||
"--spring.cloud.stream.bindings.input3-in-0.destination=foobar",
|
||||
"--spring.cloud.stream.bindings.input3-in-0.binder=globalktableBinder",
|
||||
"--spring.cloud.stream.binders.globalktableBinder.type=globalktable",
|
||||
"--spring.cloud.stream.binders.globalktableBinder.environment"
|
||||
+ ".spring.cloud.stream.kafka.streams.binder.brokers"
|
||||
@@ -86,11 +87,12 @@ public class KafkaStreamsBinderBootstrapTest {
|
||||
public void testKafkaStreamsBinderWithStandardConfigurationCanStart() {
|
||||
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
|
||||
SimpleKafkaStreamsApplication.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-1.consumer.application-id"
|
||||
"--spring.cloud.function.definition=input1;input2;input3",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id"
|
||||
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-2.consumer.application-id"
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id"
|
||||
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-3.consumer.application-id"
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id"
|
||||
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers="
|
||||
+ embeddedKafka.getEmbeddedKafka().getBrokersAsString());
|
||||
@@ -102,11 +104,12 @@ public class KafkaStreamsBinderBootstrapTest {
|
||||
public void testKafkaStreamsBinderJaasInitialization() {
|
||||
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
|
||||
SimpleKafkaStreamsApplication.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-1.consumer.application-id"
|
||||
"--spring.cloud.function.definition=input1;input2;input3",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id"
|
||||
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-jaas",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-2.consumer.application-id"
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id"
|
||||
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo-jaas",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-3.consumer.application-id"
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id"
|
||||
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar-jaas",
|
||||
"--spring.cloud.stream.kafka.streams.binder.jaas.loginModule=org.apache.kafka.common.security.plain.PlainLoginModule",
|
||||
"--spring.cloud.stream.kafka.streams.binder.jaas.options.username=foo",
|
||||
@@ -126,44 +129,27 @@ public class KafkaStreamsBinderBootstrapTest {
|
||||
}
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableBinding({SimpleKStreamBinding.class, SimpleKTableBinding.class, SimpleGlobalKTableBinding.class})
|
||||
static class SimpleKafkaStreamsApplication {
|
||||
|
||||
@StreamListener
|
||||
public void handle(@Input("input-1") KStream<Object, String> stream) {
|
||||
|
||||
@Bean
|
||||
public Consumer<KStream<Object, String>> input1() {
|
||||
return s -> {
|
||||
// No-op consumer
|
||||
};
|
||||
}
|
||||
|
||||
@StreamListener
|
||||
public void handleX(@Input("input-2") KTable<Object, String> stream) {
|
||||
|
||||
@Bean
|
||||
public Consumer<KTable<Object, String>> input2() {
|
||||
return s -> {
|
||||
// No-op consumer
|
||||
};
|
||||
}
|
||||
|
||||
@StreamListener
|
||||
public void handleY(@Input("input-3") GlobalKTable<Object, String> stream) {
|
||||
|
||||
@Bean
|
||||
public Consumer<GlobalKTable<Object, String>> input3() {
|
||||
return s -> {
|
||||
// No-op consumer
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
interface SimpleKStreamBinding {
|
||||
|
||||
@Input("input-1")
|
||||
KStream<?, ?> inputStream();
|
||||
|
||||
}
|
||||
|
||||
interface SimpleKTableBinding {
|
||||
|
||||
@Input("input-2")
|
||||
KTable<?, ?> inputStream();
|
||||
|
||||
}
|
||||
|
||||
interface SimpleGlobalKTableBinding {
|
||||
|
||||
@Input("input-3")
|
||||
GlobalKTable<?, ?> inputStream();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user