Kafka binder test migration

- EnableBinding to functional
This commit is contained in:
Soby Chacko
2021-11-11 16:33:45 -05:00
parent ed98f1129d
commit 486469da51
9 changed files with 124 additions and 166 deletions

View File

@@ -41,7 +41,7 @@ import org.springframework.util.CollectionUtils;
* A catalogue that provides binding information for Kafka Streams target types such as
* KStream. It also keeps a catalogue for the underlying {@link StreamsBuilderFactoryBean}
* and {@link StreamsConfig} associated with various
* {@link org.springframework.cloud.stream.annotation.StreamListener} methods in the
* Kafka Streams functions in the
* {@link org.springframework.context.ApplicationContext}.
*
* @author Soby Chacko

View File

@@ -51,7 +51,6 @@ import org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStrea
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binding.StreamListenerErrorMessages;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.function.FunctionConstants;
@@ -562,7 +561,7 @@ public class KafkaStreamsFunctionProcessor extends AbstractKafkaStreamsBinderPro
}
}
else {
throw new IllegalStateException(StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
//throw new IllegalStateException(StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
}
}
return arguments;

View File

@@ -39,8 +39,7 @@ import org.springframework.kafka.streams.KafkaStreamsMicrometerListener;
* This {@link SmartLifecycle} class ensures that the bean created from it is started very
* late through the bootstrap process by setting the phase value closer to
* Integer.MAX_VALUE. This is to guarantee that the {@link StreamsBuilderFactoryBean} on a
* {@link org.springframework.cloud.stream.annotation.StreamListener} method with multiple
* bindings is only started after all the binding phases have completed successfully.
* function with multiple bindings is only started after all the binding phases have completed successfully.
*
* @author Soby Chacko
*/

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,12 +18,14 @@ package org.springframework.cloud.stream.binder.kafka.integration;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -33,19 +35,14 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.config.ProducerMessageHandlerCustomizer;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
@@ -63,13 +60,18 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Oleg Zhurakousky
* @author Jon Schneider
* @author Gary Russell
* @author Soby Chacko
*
* @since 2.0
*/
@RunWith(SpringRunner.class)
// @checkstyle:off
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = "spring.cloud.stream.bindings.input.group="
+ KafkaBinderActuatorTests.TEST_CONSUMER_GROUP)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE,
properties = {
"spring.cloud.stream.bindings.input.group=" + KafkaBinderActuatorTests.TEST_CONSUMER_GROUP,
"spring.cloud.stream.function.bindings.process-in-0=input",
"spring.cloud.stream.pollable-source=input"}
)
// @checkstyle:on
@DirtiesContext
public class KafkaBinderActuatorTests {
@@ -100,17 +102,22 @@ public class KafkaBinderActuatorTests {
@Test
public void testKafkaBinderMetricsExposed() {
this.kafkaTemplate.send(Sink.INPUT, null, "foo".getBytes());
this.kafkaTemplate.send("input", null, "foo".getBytes());
this.kafkaTemplate.flush();
assertThat(this.meterRegistry.get("spring.cloud.stream.binder.kafka.offset")
.tag("group", TEST_CONSUMER_GROUP).tag("topic", Sink.INPUT).gauge()
.tag("group", TEST_CONSUMER_GROUP).tag("topic", "input").gauge()
.value()).isGreaterThan(0);
}
@Test
@Ignore
public void testKafkaBinderMetricsWhenNoMicrometer() {
new ApplicationContextRunner().withUserConfiguration(KafkaMetricsTestConfig.class)
.withPropertyValues(
"spring.cloud.stream.bindings.input.group", KafkaBinderActuatorTests.TEST_CONSUMER_GROUP,
"spring.cloud.stream.function.bindings.process-in-0", "input",
"spring.cloud.stream.pollable-source", "input")
.withClassLoader(new FilteredClassLoader("io.micrometer.core"))
.run(context -> {
assertThat(context.getBeanNamesForType(MeterRegistry.class))
@@ -148,8 +155,8 @@ public class KafkaBinderActuatorTests {
});
}
@EnableBinding({ Processor.class, PMS.class })
@EnableAutoConfiguration
@Configuration
public static class KafkaMetricsTestConfig {
@Bean
@@ -172,19 +179,18 @@ public class KafkaBinderActuatorTests {
return (handler, destinationName) -> handler.setBeanName("setByCustomizer:" + destinationName);
}
@StreamListener(Sink.INPUT)
public void process(@SuppressWarnings("unused") String payload) throws InterruptedException {
@Bean
public Consumer<String> process() {
// Artificial slow listener to emulate consumer lag
Thread.sleep(1000);
return s -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
//no-op
}
};
}
}
public interface PMS {
@Input
PollableMessageSource source();
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
@@ -33,10 +34,6 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ConsumerProperties;
@@ -47,10 +44,9 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerPro
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
@@ -62,6 +58,11 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
"spring.cloud.stream.function.definition=process;processCustom",
"spring.cloud.stream.function.bindings.process-in-0=standard-in",
"spring.cloud.stream.function.bindings.process-out-0=standard-out",
"spring.cloud.stream.function.bindings.processCustom-in-0=custom-in",
"spring.cloud.stream.function.bindings.processCustom-out-0=custom-out",
"spring.cloud.stream.kafka.bindings.standard-out.producer.configuration.key.serializer=FooSerializer.class",
"spring.cloud.stream.kafka.default.producer.configuration.key.serializer=BarSerializer.class",
"spring.cloud.stream.kafka.default.producer.configuration.value.serializer=BarSerializer.class",
@@ -167,22 +168,19 @@ public class KafkaBinderExtendedPropertiesTest {
Boolean.TRUE);
}
@EnableBinding(CustomBindingForExtendedPropertyTesting.class)
@EnableAutoConfiguration
@Configuration
public static class KafkaMetricsTestConfig {
@StreamListener("standard-in")
@SendTo("standard-out")
public String process(String payload) {
return payload;
}
@StreamListener("custom-in")
@SendTo("custom-out")
public String processCustom(String payload) {
return payload;
@Bean
public Function<String, String> process() {
return payload -> payload;
}
@Bean
public Function<String, String> processCustom() {
return payload -> payload;
}
@Bean
public RebalanceListener rebalanceListener() {
return new RebalanceListener();
@@ -190,22 +188,6 @@ public class KafkaBinderExtendedPropertiesTest {
}
interface CustomBindingForExtendedPropertyTesting {
@Input("standard-in")
SubscribableChannel standardIn();
@Output("standard-out")
MessageChannel standardOut();
@Input("custom-in")
SubscribableChannel customIn();
@Output("custom-out")
MessageChannel customOut();
}
public static class RebalanceListener implements KafkaBindingRebalanceListener {
private final Map<String, Boolean> bindings = new HashMap<>();
@@ -215,23 +197,18 @@ public class KafkaBinderExtendedPropertiesTest {
@Override
public void onPartitionsRevokedBeforeCommit(String bindingName,
Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsRevokedAfterCommit(String bindingName,
Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
this.bindings.put(bindingName, initial);
this.latch.countDown();
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,21 +18,21 @@ package org.springframework.cloud.stream.binder.kafka.integration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
@@ -47,21 +47,19 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Aldo Sinanaj
* @author Gary Russell
* @author Soby Chacko
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
"spring.kafka.consumer.auto-offset-reset=earliest" })
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.cloud.stream.function.bindings.inputListen-in-0=kafkaNullInput"})
@DirtiesContext
@Ignore
public class KafkaNullConverterTest {
private static final String KAFKA_BROKERS_PROPERTY = "spring.kafka.bootstrap-servers";
@Autowired
private MessageChannel kafkaNullOutput;
@Autowired
private MessageChannel kafkaNullInput;
private ApplicationContext context;
@Autowired
private KafkaNullConverterTestConfig config;
@@ -82,7 +80,9 @@ public class KafkaNullConverterTest {
@Test
public void testKafkaNullConverterOutput() throws InterruptedException {
this.kafkaNullOutput.send(new GenericMessage<>(KafkaNull.INSTANCE));
final StreamBridge streamBridge = context.getBean(StreamBridge.class);
streamBridge.send("kafkaNullOutput", new GenericMessage<>(KafkaNull.INSTANCE));
assertThat(this.config.countDownLatchOutput.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.outputPayload).isNull();
@@ -90,14 +90,17 @@ public class KafkaNullConverterTest {
@Test
public void testKafkaNullConverterInput() throws InterruptedException {
this.kafkaNullInput.send(new GenericMessage<>(KafkaNull.INSTANCE));
final MessageChannel kafkaNullInput = context.getBean("kafkaNullInput", MessageChannel.class);
kafkaNullInput.send(new GenericMessage<>(KafkaNull.INSTANCE));
assertThat(this.config.countDownLatchInput.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.inputPayload).isNull();
}
@TestConfiguration
@EnableBinding(KafkaNullTestChannels.class)
@EnableAutoConfiguration
@Configuration
public static class KafkaNullConverterTestConfig {
final CountDownLatch countDownLatchOutput = new CountDownLatch(1);
@@ -114,22 +117,13 @@ public class KafkaNullConverterTest {
countDownLatchOutput.countDown();
}
@StreamListener("kafkaNullInput")
public void inputListen(@Payload(required = false) byte[] payload) {
this.inputPayload = payload;
countDownLatchInput.countDown();
@Bean
public Consumer<byte[]> inputListen() {
return in -> {
this.inputPayload = in;
countDownLatchInput.countDown();
};
}
}
public interface KafkaNullTestChannels {
@Input
MessageChannel kafkaNullInput();
@Output
MessageChannel kafkaNullOutput();
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2019 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,11 +35,12 @@ import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -58,6 +59,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Gary Russell
* @author Soby Chacko
* @since 2.1.4
*
*/
@@ -80,7 +82,7 @@ public class ProducerOnlyTransactionTests {
private Sender sender;
@Autowired
private MessageChannel output;
private ApplicationContext context;
@BeforeClass
public static void setup() {
@@ -95,7 +97,8 @@ public class ProducerOnlyTransactionTests {
@Test
public void testProducerTx() {
this.sender.DoInTransaction(this.output);
final StreamBridge streamBridge = context.getBean(StreamBridge.class);
this.sender.DoInTransaction(streamBridge);
assertThat(this.sender.isInTx()).isTrue();
Map<String, Object> props = KafkaTestUtils.consumerProps("consumeTx", "false",
embeddedKafka.getEmbeddedKafka());
@@ -109,9 +112,9 @@ public class ProducerOnlyTransactionTests {
assertThat(record.value()).isEqualTo("foo".getBytes());
}
@EnableBinding(Source.class)
@EnableAutoConfiguration
@EnableTransactionManagement
@Configuration
public static class Config {
@Bean
@@ -140,9 +143,9 @@ public class ProducerOnlyTransactionTests {
private boolean isInTx;
@Transactional
public void DoInTransaction(MessageChannel output) {
public void DoInTransaction(StreamBridge streamBridge) {
this.isInTx = TransactionSynchronizationManager.isActualTransactionActive();
output.send(new GenericMessage<>("foo"));
streamBridge.send("output", new GenericMessage<>("foo".getBytes()));
}
public boolean isInTx() {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.binder.kafka.integration.topic.configs;
import java.util.function.Function;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -23,24 +25,21 @@ import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @author Heiko Does
* @author Soby Chacko
*/
@RunWith(SpringRunner.class)
@SpringBootTest(
classes = BaseKafkaBinderTopicPropertiesUpdateTest.TopicAutoConfigsTestConfig.class,
webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
"spring.cloud.stream.function.bindings.process-in-0=standard-in",
"spring.cloud.stream.function.bindings.process-out-0=standard-out",
"spring.cloud.stream.kafka.bindings.standard-out.producer.topic.properties.retention.ms=9001",
"spring.cloud.stream.kafka.default.producer.topic.properties.retention.ms=-1",
"spring.cloud.stream.kafka.bindings.standard-in.consumer.topic.properties.retention.ms=9001",
@@ -65,24 +64,12 @@ public abstract class BaseKafkaBinderTopicPropertiesUpdateTest {
System.clearProperty(KAFKA_BROKERS_PROPERTY);
}
@EnableBinding(CustomBindingForTopicPropertiesUpdateTesting.class)
@EnableAutoConfiguration
public static class TopicAutoConfigsTestConfig {
@StreamListener("standard-in")
@SendTo("standard-out")
public String process(String payload) {
return payload;
@Bean
public Function<String, String> process() {
return payload -> payload;
}
}
interface CustomBindingForTopicPropertiesUpdateTesting {
@Input("standard-in")
SubscribableChannel standardIn();
@Output("standard-out")
MessageChannel standardOut();
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2019 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import kafka.server.KafkaConfig;
import org.junit.AfterClass;
@@ -33,13 +34,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@@ -49,8 +47,6 @@ import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.backoff.FixedBackOff;
@@ -61,6 +57,7 @@ import static org.mockito.Mockito.mock;
/**
* @author Gary Russell
* @author Soby Chacko
* @since 3.0
*
*/
@@ -69,6 +66,11 @@ import static org.mockito.Mockito.mock;
"spring.kafka.consumer.properties.isolation.level=read_committed",
"spring.kafka.consumer.enable-auto-commit=false",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.cloud.function.definition=listenIn;listenIn2",
"spring.cloud.stream.function.bindings.listenIn-in-0=input",
"spring.cloud.stream.function.bindings.listenIn-out-0=output",
"spring.cloud.stream.function.bindings.listenIn2-in-0=input2",
"spring.cloud.stream.function.bindings.listenIn2-out-0=output2",
"spring.cloud.stream.bindings.input.destination=consumer.producer.txIn",
"spring.cloud.stream.bindings.input.group=consumer.producer.tx",
"spring.cloud.stream.bindings.input.consumer.max-attempts=1",
@@ -91,6 +93,9 @@ public class ConsumerProducerTransactionTests {
@Autowired
private Config config;
@Autowired
private ApplicationContext context;
@BeforeClass
public static void setup() {
System.setProperty(KAFKA_BROKERS_PROPERTY,
@@ -115,26 +120,22 @@ public class ConsumerProducerTransactionTests {
public void externalTM() {
assertThat(this.config.input2Container.getContainerProperties().getTransactionManager())
.isSameAs(this.config.tm);
Object handler = KafkaTestUtils.getPropertyValue(this.config.output2, "dispatcher.handlers", Set.class)
final MessageChannel output2 = context.getBean("output2", MessageChannel.class);
Object handler = KafkaTestUtils.getPropertyValue(output2, "dispatcher.handlers", Set.class)
.iterator().next();
assertThat(KafkaTestUtils.getPropertyValue(handler, "delegate.kafkaTemplate.producerFactory"))
.isSameAs(this.config.pf);
}
@EnableBinding(TwoProcessors.class)
@EnableAutoConfiguration
@Configuration
public static class Config {
final List<String> outs = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(2);
@Autowired
private MessageChannel output;
@Autowired
MessageChannel output2;
AbstractMessageListenerContainer<?, ?> input2Container;
ProducerFactory pf;
@@ -147,16 +148,19 @@ public class ConsumerProducerTransactionTests {
this.latch.countDown();
}
@StreamListener(Processor.INPUT)
public void listenIn(String in) {
this.output.send(new GenericMessage<>(in.toUpperCase()));
if (in.equals("two")) {
throw new RuntimeException("fail");
}
@Bean
public Function<String, String> listenIn() {
return in -> {
if (in.equals("two")) {
throw new RuntimeException("fail");
}
return in.toUpperCase();
};
}
@StreamListener("input2")
public void listenIn2(String in) {
@Bean
public Function<String, String> listenIn2() {
return in -> in;
}
@Bean
@@ -187,17 +191,6 @@ public class ConsumerProducerTransactionTests {
this.tm = mock;
return mock;
}
}
public interface TwoProcessors extends Processor {
@Input
SubscribableChannel input2();
@Output
MessageChannel output2();
}
}