diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java index 1427c2b0..0f8885cc 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java @@ -16,6 +16,8 @@ package org.springframework.cloud.stream.binder.kafka.streams.bootstrap; +import java.util.function.Consumer; + import javax.security.auth.login.AppConfigurationEntry; import org.apache.kafka.common.security.JaasUtils; @@ -29,10 +31,8 @@ import org.junit.Test; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import static org.assertj.core.api.Assertions.assertThat; @@ -54,26 +54,27 @@ public class KafkaStreamsBinderBootstrapTest { public void testKStreamBinderWithCustomEnvironmentCanStart() { ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder( SimpleKafkaStreamsApplication.class).web(WebApplicationType.NONE).run( - "--spring.cloud.stream.kafka.streams.bindings.input-1.consumer.application-id" + "--spring.cloud.function.definition=input1;input2;input3", + "--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id" + "=testKStreamBinderWithCustomEnvironmentCanStart", - "--spring.cloud.stream.kafka.streams.bindings.input-2.consumer.application-id" + "--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id" + "=testKStreamBinderWithCustomEnvironmentCanStart-foo", - "--spring.cloud.stream.kafka.streams.bindings.input-3.consumer.application-id" + "--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id" + "=testKStreamBinderWithCustomEnvironmentCanStart-foobar", - "--spring.cloud.stream.bindings.input-1.destination=foo", - "--spring.cloud.stream.bindings.input-1.binder=kstreamBinder", + "--spring.cloud.stream.bindings.input1-in-0.destination=foo", + "--spring.cloud.stream.bindings.input1-in-0.binder=kstreamBinder", "--spring.cloud.stream.binders.kstreamBinder.type=kstream", "--spring.cloud.stream.binders.kstreamBinder.environment" + ".spring.cloud.stream.kafka.streams.binder.brokers" + "=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString(), - "--spring.cloud.stream.bindings.input-2.destination=bar", - "--spring.cloud.stream.bindings.input-2.binder=ktableBinder", + "--spring.cloud.stream.bindings.input2-in-0.destination=bar", + "--spring.cloud.stream.bindings.input2-in-0.binder=ktableBinder", "--spring.cloud.stream.binders.ktableBinder.type=ktable", "--spring.cloud.stream.binders.ktableBinder.environment" + ".spring.cloud.stream.kafka.streams.binder.brokers" + "=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString(), - "--spring.cloud.stream.bindings.input-3.destination=foobar", - "--spring.cloud.stream.bindings.input-3.binder=globalktableBinder", + "--spring.cloud.stream.bindings.input3-in-0.destination=foobar", + "--spring.cloud.stream.bindings.input3-in-0.binder=globalktableBinder", "--spring.cloud.stream.binders.globalktableBinder.type=globalktable", "--spring.cloud.stream.binders.globalktableBinder.environment" + ".spring.cloud.stream.kafka.streams.binder.brokers" @@ -86,11 +87,12 @@ public class KafkaStreamsBinderBootstrapTest { public void testKafkaStreamsBinderWithStandardConfigurationCanStart() { ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder( SimpleKafkaStreamsApplication.class).web(WebApplicationType.NONE).run( - "--spring.cloud.stream.kafka.streams.bindings.input-1.consumer.application-id" + "--spring.cloud.function.definition=input1;input2;input3", + "--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id" + "=testKafkaStreamsBinderWithStandardConfigurationCanStart", - "--spring.cloud.stream.kafka.streams.bindings.input-2.consumer.application-id" + "--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id" + "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo", - "--spring.cloud.stream.kafka.streams.bindings.input-3.consumer.application-id" + "--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id" + "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar", "--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString()); @@ -102,11 +104,12 @@ public class KafkaStreamsBinderBootstrapTest { public void testKafkaStreamsBinderJaasInitialization() { ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder( SimpleKafkaStreamsApplication.class).web(WebApplicationType.NONE).run( - "--spring.cloud.stream.kafka.streams.bindings.input-1.consumer.application-id" + "--spring.cloud.function.definition=input1;input2;input3", + "--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id" + "=testKafkaStreamsBinderWithStandardConfigurationCanStart-jaas", - "--spring.cloud.stream.kafka.streams.bindings.input-2.consumer.application-id" + "--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id" + "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo-jaas", - "--spring.cloud.stream.kafka.streams.bindings.input-3.consumer.application-id" + "--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id" + "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar-jaas", "--spring.cloud.stream.kafka.streams.binder.jaas.loginModule=org.apache.kafka.common.security.plain.PlainLoginModule", "--spring.cloud.stream.kafka.streams.binder.jaas.options.username=foo", @@ -126,44 +129,27 @@ public class KafkaStreamsBinderBootstrapTest { } @SpringBootApplication - @EnableBinding({SimpleKStreamBinding.class, SimpleKTableBinding.class, SimpleGlobalKTableBinding.class}) static class SimpleKafkaStreamsApplication { - @StreamListener - public void handle(@Input("input-1") KStream stream) { - + @Bean + public Consumer> input1() { + return s -> { + // No-op consumer + }; } - @StreamListener - public void handleX(@Input("input-2") KTable stream) { - + @Bean + public Consumer> input2() { + return s -> { + // No-op consumer + }; } - @StreamListener - public void handleY(@Input("input-3") GlobalKTable stream) { - + @Bean + public Consumer> input3() { + return s -> { + // No-op consumer + }; } - - } - - interface SimpleKStreamBinding { - - @Input("input-1") - KStream inputStream(); - - } - - interface SimpleKTableBinding { - - @Input("input-2") - KTable inputStream(); - - } - - interface SimpleGlobalKTableBinding { - - @Input("input-3") - GlobalKTable inputStream(); - } }