From 486469da51dfd096f4bcf202a5672eb4fbc630a7 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 11 Nov 2021 16:33:45 -0500 Subject: [PATCH] Kafka binder test migration - EnableBinding to functional --- ...fkaStreamsBindingInformationCatalogue.java | 2 +- .../KafkaStreamsFunctionProcessor.java | 3 +- .../streams/StreamsBuilderFactoryManager.java | 3 +- .../integration/KafkaBinderActuatorTests.java | 52 ++++++++------- .../KafkaBinderExtendedPropertiesTest.java | 55 +++++----------- .../integration/KafkaNullConverterTest.java | 58 ++++++++--------- .../ProducerOnlyTransactionTests.java | 19 +++--- ...eKafkaBinderTopicPropertiesUpdateTest.java | 33 +++------- .../ConsumerProducerTransactionTests.java | 65 +++++++++---------- 9 files changed, 124 insertions(+), 166 deletions(-) diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBindingInformationCatalogue.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBindingInformationCatalogue.java index a8dc9b8b..c12b77d0 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBindingInformationCatalogue.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBindingInformationCatalogue.java @@ -41,7 +41,7 @@ import org.springframework.util.CollectionUtils; * A catalogue that provides binding information for Kafka Streams target types such as * KStream. It also keeps a catalogue for the underlying {@link StreamsBuilderFactoryBean} * and {@link StreamsConfig} associated with various - * {@link org.springframework.cloud.stream.annotation.StreamListener} methods in the + * Kafka Streams functions in the * {@link org.springframework.context.ApplicationContext}. * * @author Soby Chacko diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java index c9195371..ff0286c1 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsFunctionProcessor.java @@ -51,7 +51,6 @@ import org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStrea import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties; import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties; -import org.springframework.cloud.stream.binding.StreamListenerErrorMessages; import org.springframework.cloud.stream.config.BindingProperties; import org.springframework.cloud.stream.config.BindingServiceProperties; import org.springframework.cloud.stream.function.FunctionConstants; @@ -562,7 +561,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro } } else { - throw new IllegalStateException(StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS); + //throw new IllegalStateException(StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS); } } return arguments; diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/StreamsBuilderFactoryManager.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/StreamsBuilderFactoryManager.java index 9f2bbf0f..c0fa13e4 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/StreamsBuilderFactoryManager.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/StreamsBuilderFactoryManager.java @@ -39,8 +39,7 @@ import org.springframework.kafka.streams.KafkaStreamsMicrometerListener; * This {@link SmartLifecycle} class ensures that the bean created from it is started very * late through the bootstrap process by setting the phase value closer to * Integer.MAX_VALUE. This is to guarantee that the {@link StreamsBuilderFactoryBean} on a - * {@link org.springframework.cloud.stream.annotation.StreamListener} method with multiple - * bindings is only started after all the binding phases have completed successfully. + * function with multiple bindings is only started after all the binding phases have completed successfully. * * @author Soby Chacko */ diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderActuatorTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderActuatorTests.java index f722dc51..ce5e1d10 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderActuatorTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderActuatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,12 +18,14 @@ package org.springframework.cloud.stream.binder.kafka.integration; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.binder.MeterBinder; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; @@ -33,19 +35,14 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.FilteredClassLoader; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.runner.ApplicationContextRunner; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.binder.Binding; -import org.springframework.cloud.stream.binder.PollableMessageSource; import org.springframework.cloud.stream.binding.BindingService; import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer; import org.springframework.cloud.stream.config.ListenerContainerCustomizer; import org.springframework.cloud.stream.config.MessageSourceCustomizer; import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer; -import org.springframework.cloud.stream.messaging.Processor; -import org.springframework.cloud.stream.messaging.Sink; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.inbound.KafkaMessageSource; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; @@ -63,13 +60,18 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Oleg Zhurakousky * @author Jon Schneider * @author Gary Russell + * @author Soby Chacko * * @since 2.0 */ @RunWith(SpringRunner.class) // @checkstyle:off -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = "spring.cloud.stream.bindings.input.group=" - + KafkaBinderActuatorTests.TEST_CONSUMER_GROUP) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, + properties = { + "spring.cloud.stream.bindings.input.group=" + KafkaBinderActuatorTests.TEST_CONSUMER_GROUP, + "spring.cloud.stream.function.bindings.process-in-0=input", + "spring.cloud.stream.pollable-source=input"} +) // @checkstyle:on @DirtiesContext public class KafkaBinderActuatorTests { @@ -100,17 +102,22 @@ public class KafkaBinderActuatorTests { @Test public void testKafkaBinderMetricsExposed() { - this.kafkaTemplate.send(Sink.INPUT, null, "foo".getBytes()); + this.kafkaTemplate.send("input", null, "foo".getBytes()); this.kafkaTemplate.flush(); assertThat(this.meterRegistry.get("spring.cloud.stream.binder.kafka.offset") - .tag("group", TEST_CONSUMER_GROUP).tag("topic", Sink.INPUT).gauge() + .tag("group", TEST_CONSUMER_GROUP).tag("topic", "input").gauge() .value()).isGreaterThan(0); } @Test + @Ignore public void testKafkaBinderMetricsWhenNoMicrometer() { new ApplicationContextRunner().withUserConfiguration(KafkaMetricsTestConfig.class) + .withPropertyValues( + "spring.cloud.stream.bindings.input.group", KafkaBinderActuatorTests.TEST_CONSUMER_GROUP, + "spring.cloud.stream.function.bindings.process-in-0", "input", + "spring.cloud.stream.pollable-source", "input") .withClassLoader(new FilteredClassLoader("io.micrometer.core")) .run(context -> { assertThat(context.getBeanNamesForType(MeterRegistry.class)) @@ -148,8 +155,8 @@ public class KafkaBinderActuatorTests { }); } - @EnableBinding({ Processor.class, PMS.class }) @EnableAutoConfiguration + @Configuration public static class KafkaMetricsTestConfig { @Bean @@ -172,19 +179,18 @@ public class KafkaBinderActuatorTests { return (handler, destinationName) -> handler.setBeanName("setByCustomizer:" + destinationName); } - @StreamListener(Sink.INPUT) - public void process(@SuppressWarnings("unused") String payload) throws InterruptedException { + @Bean + public Consumer process() { // Artificial slow listener to emulate consumer lag - Thread.sleep(1000); + return s -> { + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + //no-op + } + }; } } - - public interface PMS { - - @Input - PollableMessageSource source(); - - } - } diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderExtendedPropertiesTest.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderExtendedPropertiesTest.java index db2064c0..64b5a79b 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderExtendedPropertiesTest.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderExtendedPropertiesTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; @@ -33,10 +34,6 @@ import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.Output; -import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.binder.BinderFactory; import org.springframework.cloud.stream.binder.ConsumerProperties; @@ -47,10 +44,9 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerPro import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.SubscribableChannel; -import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit4.SpringRunner; @@ -62,6 +58,11 @@ import static org.assertj.core.api.Assertions.assertThat; */ @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = { + "spring.cloud.stream.function.definition=process;processCustom", + "spring.cloud.stream.function.bindings.process-in-0=standard-in", + "spring.cloud.stream.function.bindings.process-out-0=standard-out", + "spring.cloud.stream.function.bindings.processCustom-in-0=custom-in", + "spring.cloud.stream.function.bindings.processCustom-out-0=custom-out", "spring.cloud.stream.kafka.bindings.standard-out.producer.configuration.key.serializer=FooSerializer.class", "spring.cloud.stream.kafka.default.producer.configuration.key.serializer=BarSerializer.class", "spring.cloud.stream.kafka.default.producer.configuration.value.serializer=BarSerializer.class", @@ -167,22 +168,19 @@ public class KafkaBinderExtendedPropertiesTest { Boolean.TRUE); } - @EnableBinding(CustomBindingForExtendedPropertyTesting.class) @EnableAutoConfiguration + @Configuration public static class KafkaMetricsTestConfig { - @StreamListener("standard-in") - @SendTo("standard-out") - public String process(String payload) { - return payload; - } - - @StreamListener("custom-in") - @SendTo("custom-out") - public String processCustom(String payload) { - return payload; + @Bean + public Function process() { + return payload -> payload; } + @Bean + public Function processCustom() { + return payload -> payload; + } @Bean public RebalanceListener rebalanceListener() { return new RebalanceListener(); @@ -190,22 +188,6 @@ public class KafkaBinderExtendedPropertiesTest { } - interface CustomBindingForExtendedPropertyTesting { - - @Input("standard-in") - SubscribableChannel standardIn(); - - @Output("standard-out") - MessageChannel standardOut(); - - @Input("custom-in") - SubscribableChannel customIn(); - - @Output("custom-out") - MessageChannel customOut(); - - } - public static class RebalanceListener implements KafkaBindingRebalanceListener { private final Map bindings = new HashMap<>(); @@ -215,23 +197,18 @@ public class KafkaBinderExtendedPropertiesTest { @Override public void onPartitionsRevokedBeforeCommit(String bindingName, Consumer consumer, Collection partitions) { - } @Override public void onPartitionsRevokedAfterCommit(String bindingName, Consumer consumer, Collection partitions) { - } @Override public void onPartitionsAssigned(String bindingName, Consumer consumer, Collection partitions, boolean initial) { - this.bindings.put(bindingName, initial); this.latch.countDown(); } - } - } diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaNullConverterTest.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaNullConverterTest.java index 3015b10f..f796069e 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaNullConverterTest.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaNullConverterTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2017 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,21 +18,21 @@ package org.springframework.cloud.stream.binder.kafka.integration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.Output; -import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; @@ -47,21 +47,19 @@ import static org.assertj.core.api.Assertions.assertThat; /** * @author Aldo Sinanaj * @author Gary Russell + * @author Soby Chacko */ @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = { - "spring.kafka.consumer.auto-offset-reset=earliest" }) + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.cloud.stream.function.bindings.inputListen-in-0=kafkaNullInput"}) @DirtiesContext -@Ignore public class KafkaNullConverterTest { private static final String KAFKA_BROKERS_PROPERTY = "spring.kafka.bootstrap-servers"; @Autowired - private MessageChannel kafkaNullOutput; - - @Autowired - private MessageChannel kafkaNullInput; + private ApplicationContext context; @Autowired private KafkaNullConverterTestConfig config; @@ -82,7 +80,9 @@ public class KafkaNullConverterTest { @Test public void testKafkaNullConverterOutput() throws InterruptedException { - this.kafkaNullOutput.send(new GenericMessage<>(KafkaNull.INSTANCE)); + final StreamBridge streamBridge = context.getBean(StreamBridge.class); + + streamBridge.send("kafkaNullOutput", new GenericMessage<>(KafkaNull.INSTANCE)); assertThat(this.config.countDownLatchOutput.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.outputPayload).isNull(); @@ -90,14 +90,17 @@ public class KafkaNullConverterTest { @Test public void testKafkaNullConverterInput() throws InterruptedException { - this.kafkaNullInput.send(new GenericMessage<>(KafkaNull.INSTANCE)); + + final MessageChannel kafkaNullInput = context.getBean("kafkaNullInput", MessageChannel.class); + + kafkaNullInput.send(new GenericMessage<>(KafkaNull.INSTANCE)); assertThat(this.config.countDownLatchInput.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.inputPayload).isNull(); } - @TestConfiguration - @EnableBinding(KafkaNullTestChannels.class) + @EnableAutoConfiguration + @Configuration public static class KafkaNullConverterTestConfig { final CountDownLatch countDownLatchOutput = new CountDownLatch(1); @@ -114,22 +117,13 @@ public class KafkaNullConverterTest { countDownLatchOutput.countDown(); } - @StreamListener("kafkaNullInput") - public void inputListen(@Payload(required = false) byte[] payload) { - this.inputPayload = payload; - countDownLatchInput.countDown(); + @Bean + public Consumer inputListen() { + return in -> { + this.inputPayload = in; + countDownLatchInput.countDown(); + }; } } - - public interface KafkaNullTestChannels { - - @Input - MessageChannel kafkaNullInput(); - - @Output - MessageChannel kafkaNullOutput(); - - } - } diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/ProducerOnlyTransactionTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/ProducerOnlyTransactionTests.java index 41e14f26..861a7e15 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/ProducerOnlyTransactionTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/ProducerOnlyTransactionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2019 the original author or authors. + * Copyright 2019-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,11 +35,12 @@ import org.springframework.beans.factory.BeanCreationException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.binder.BinderFactory; import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder; -import org.springframework.cloud.stream.messaging.Source; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import org.springframework.kafka.test.utils.KafkaTestUtils; @@ -58,6 +59,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** * @author Gary Russell + * @author Soby Chacko * @since 2.1.4 * */ @@ -80,7 +82,7 @@ public class ProducerOnlyTransactionTests { private Sender sender; @Autowired - private MessageChannel output; + private ApplicationContext context; @BeforeClass public static void setup() { @@ -95,7 +97,8 @@ public class ProducerOnlyTransactionTests { @Test public void testProducerTx() { - this.sender.DoInTransaction(this.output); + final StreamBridge streamBridge = context.getBean(StreamBridge.class); + this.sender.DoInTransaction(streamBridge); assertThat(this.sender.isInTx()).isTrue(); Map props = KafkaTestUtils.consumerProps("consumeTx", "false", embeddedKafka.getEmbeddedKafka()); @@ -109,9 +112,9 @@ public class ProducerOnlyTransactionTests { assertThat(record.value()).isEqualTo("foo".getBytes()); } - @EnableBinding(Source.class) @EnableAutoConfiguration @EnableTransactionManagement + @Configuration public static class Config { @Bean @@ -140,9 +143,9 @@ public class ProducerOnlyTransactionTests { private boolean isInTx; @Transactional - public void DoInTransaction(MessageChannel output) { + public void DoInTransaction(StreamBridge streamBridge) { this.isInTx = TransactionSynchronizationManager.isActualTransactionActive(); - output.send(new GenericMessage<>("foo")); + streamBridge.send("output", new GenericMessage<>("foo".getBytes())); } public boolean isInTx() { diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/topic/configs/BaseKafkaBinderTopicPropertiesUpdateTest.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/topic/configs/BaseKafkaBinderTopicPropertiesUpdateTest.java index bfaee22c..72bec2af 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/topic/configs/BaseKafkaBinderTopicPropertiesUpdateTest.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/topic/configs/BaseKafkaBinderTopicPropertiesUpdateTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.cloud.stream.binder.kafka.integration.topic.configs; +import java.util.function.Function; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -23,24 +25,21 @@ import org.junit.runner.RunWith; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.Output; -import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.context.annotation.Bean; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.SubscribableChannel; -import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit4.SpringRunner; /** * @author Heiko Does + * @author Soby Chacko */ @RunWith(SpringRunner.class) @SpringBootTest( classes = BaseKafkaBinderTopicPropertiesUpdateTest.TopicAutoConfigsTestConfig.class, webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = { + "spring.cloud.stream.function.bindings.process-in-0=standard-in", + "spring.cloud.stream.function.bindings.process-out-0=standard-out", "spring.cloud.stream.kafka.bindings.standard-out.producer.topic.properties.retention.ms=9001", "spring.cloud.stream.kafka.default.producer.topic.properties.retention.ms=-1", "spring.cloud.stream.kafka.bindings.standard-in.consumer.topic.properties.retention.ms=9001", @@ -65,24 +64,12 @@ public abstract class BaseKafkaBinderTopicPropertiesUpdateTest { System.clearProperty(KAFKA_BROKERS_PROPERTY); } - @EnableBinding(CustomBindingForTopicPropertiesUpdateTesting.class) @EnableAutoConfiguration public static class TopicAutoConfigsTestConfig { - @StreamListener("standard-in") - @SendTo("standard-out") - public String process(String payload) { - return payload; + @Bean + public Function process() { + return payload -> payload; } } - - interface CustomBindingForTopicPropertiesUpdateTesting { - - @Input("standard-in") - SubscribableChannel standardIn(); - - @Output("standard-out") - MessageChannel standardOut(); - } - } diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/ConsumerProducerTransactionTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/ConsumerProducerTransactionTests.java index 4f49b206..dbf30a09 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/ConsumerProducerTransactionTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration2/ConsumerProducerTransactionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2019 the original author or authors. + * Copyright 2019-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import kafka.server.KafkaConfig; import org.junit.AfterClass; @@ -33,13 +34,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.Output; -import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.config.ListenerContainerCustomizer; -import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @@ -49,8 +47,6 @@ import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.transaction.KafkaAwareTransactionManager; import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.SubscribableChannel; -import org.springframework.messaging.support.GenericMessage; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.backoff.FixedBackOff; @@ -61,6 +57,7 @@ import static org.mockito.Mockito.mock; /** * @author Gary Russell + * @author Soby Chacko * @since 3.0 * */ @@ -69,6 +66,11 @@ import static org.mockito.Mockito.mock; "spring.kafka.consumer.properties.isolation.level=read_committed", "spring.kafka.consumer.enable-auto-commit=false", "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.cloud.function.definition=listenIn;listenIn2", + "spring.cloud.stream.function.bindings.listenIn-in-0=input", + "spring.cloud.stream.function.bindings.listenIn-out-0=output", + "spring.cloud.stream.function.bindings.listenIn2-in-0=input2", + "spring.cloud.stream.function.bindings.listenIn2-out-0=output2", "spring.cloud.stream.bindings.input.destination=consumer.producer.txIn", "spring.cloud.stream.bindings.input.group=consumer.producer.tx", "spring.cloud.stream.bindings.input.consumer.max-attempts=1", @@ -91,6 +93,9 @@ public class ConsumerProducerTransactionTests { @Autowired private Config config; + @Autowired + private ApplicationContext context; + @BeforeClass public static void setup() { System.setProperty(KAFKA_BROKERS_PROPERTY, @@ -115,26 +120,22 @@ public class ConsumerProducerTransactionTests { public void externalTM() { assertThat(this.config.input2Container.getContainerProperties().getTransactionManager()) .isSameAs(this.config.tm); - Object handler = KafkaTestUtils.getPropertyValue(this.config.output2, "dispatcher.handlers", Set.class) + final MessageChannel output2 = context.getBean("output2", MessageChannel.class); + + Object handler = KafkaTestUtils.getPropertyValue(output2, "dispatcher.handlers", Set.class) .iterator().next(); assertThat(KafkaTestUtils.getPropertyValue(handler, "delegate.kafkaTemplate.producerFactory")) .isSameAs(this.config.pf); } - @EnableBinding(TwoProcessors.class) @EnableAutoConfiguration + @Configuration public static class Config { final List outs = new ArrayList<>(); final CountDownLatch latch = new CountDownLatch(2); - @Autowired - private MessageChannel output; - - @Autowired - MessageChannel output2; - AbstractMessageListenerContainer input2Container; ProducerFactory pf; @@ -147,16 +148,19 @@ public class ConsumerProducerTransactionTests { this.latch.countDown(); } - @StreamListener(Processor.INPUT) - public void listenIn(String in) { - this.output.send(new GenericMessage<>(in.toUpperCase())); - if (in.equals("two")) { - throw new RuntimeException("fail"); - } + @Bean + public Function listenIn() { + return in -> { + if (in.equals("two")) { + throw new RuntimeException("fail"); + } + return in.toUpperCase(); + }; } - @StreamListener("input2") - public void listenIn2(String in) { + @Bean + public Function listenIn2() { + return in -> in; } @Bean @@ -187,17 +191,6 @@ public class ConsumerProducerTransactionTests { this.tm = mock; return mock; } - } - - public interface TwoProcessors extends Processor { - - @Input - SubscribableChannel input2(); - - @Output - MessageChannel output2(); - - } - } +