diff --git a/spring-cloud-stream-binder-kafka-core/.settings/org.eclipse.jdt.ui.prefs b/spring-cloud-stream-binder-kafka-core/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 00000000..f9aac64a --- /dev/null +++ b/spring-cloud-stream-binder-kafka-core/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,5 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=java;javax;com;org;org.springframework;ch.qos;\#; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 diff --git a/spring-cloud-stream-binder-kafka/.settings/org.eclipse.jdt.ui.prefs b/spring-cloud-stream-binder-kafka/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 00000000..f9aac64a --- /dev/null +++ b/spring-cloud-stream-binder-kafka/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,5 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=java;javax;com;org;org.springframework;ch.qos;\#; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99 diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index ef2ed0fe..45d68a31 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -87,6 +87,7 @@ import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.integration.handler.BridgeHandler; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; import org.springframework.integration.kafka.support.KafkaSendFailureException; @@ -2324,7 +2325,6 @@ public class KafkaBinderTests extends KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); binderConfiguration.setHeaders("foo"); Binder binder = getBinder(binderConfiguration); - QueueChannel moduleInputChannel = new QueueChannel(); DirectChannel moduleOutputChannel1 = new DirectChannel(); ExtendedProducerProperties producerProperties1 = createProducerProperties(); producerProperties1.setHeaderMode(HeaderMode.embeddedHeaders); @@ -2345,6 +2345,12 @@ public class KafkaBinderTests extends ExtendedConsumerProperties consumerProperties = createConsumerProperties(); consumerProperties.setHeaderMode(HeaderMode.embeddedHeaders); + DirectChannel moduleInputChannel = createBindableChannel("input", + createConsumerBindingProperties(consumerProperties)); + QueueChannel bridged = new QueueChannel(); + BridgeHandler bridge = new BridgeHandler(); + bridge.setOutputChannel(bridged); + moduleInputChannel.subscribe(bridge); Binding consumerBinding = binder.bindConsumer("mixed.0", "test", moduleInputChannel, consumerProperties); Message message = org.springframework.integration.support.MessageBuilder @@ -2356,19 +2362,19 @@ public class KafkaBinderTests extends moduleOutputChannel1.send(message); moduleOutputChannel2.send(message); moduleOutputChannel3.send(message); - Message inbound = receive(moduleInputChannel, 10_000); + Message inbound = receive(bridged, 10_000); assertThat(inbound).isNotNull(); - assertThat(new String((byte[]) inbound.getPayload(), StandardCharsets.UTF_8)).isEqualTo("testSendAndReceiveWithMixedMode"); + assertThat(inbound.getPayload()).isEqualTo("testSendAndReceiveWithMixedMode"); assertThat(inbound.getHeaders().get("foo")).isEqualTo("bar"); assertThat(inbound.getHeaders().get(BinderHeaders.NATIVE_HEADERS_PRESENT)).isNull(); - inbound = receive(moduleInputChannel); + inbound = receive(bridged); assertThat(inbound).isNotNull(); - assertThat(new String((byte[]) inbound.getPayload(), StandardCharsets.UTF_8)).isEqualTo("testSendAndReceiveWithMixedMode"); + assertThat(inbound.getPayload()).isEqualTo("testSendAndReceiveWithMixedMode"); assertThat(inbound.getHeaders().get("foo")).isEqualTo("bar"); assertThat(inbound.getHeaders().get(BinderHeaders.NATIVE_HEADERS_PRESENT)).isEqualTo(Boolean.TRUE); - inbound = receive(moduleInputChannel); + inbound = receive(bridged); assertThat(inbound).isNotNull(); - assertThat(new String((byte[]) inbound.getPayload(), StandardCharsets.UTF_8)).isEqualTo("testSendAndReceiveWithMixedMode"); + assertThat(inbound.getPayload()).isEqualTo("testSendAndReceiveWithMixedMode"); assertThat(inbound.getHeaders().get("foo")).isNull(); assertThat(inbound.getHeaders().get(BinderHeaders.NATIVE_HEADERS_PRESENT)).isNull(); diff --git a/spring-cloud-stream-binder-kstream/.settings/org.eclipse.jdt.ui.prefs b/spring-cloud-stream-binder-kstream/.settings/org.eclipse.jdt.ui.prefs new file mode 100644 index 00000000..f9aac64a --- /dev/null +++ b/spring-cloud-stream-binder-kstream/.settings/org.eclipse.jdt.ui.prefs @@ -0,0 +1,5 @@ +eclipse.preferences.version=1 +org.eclipse.jdt.ui.ignorelowercasenames=true +org.eclipse.jdt.ui.importorder=java;javax;com;org;org.springframework;ch.qos;\#; +org.eclipse.jdt.ui.ondemandthreshold=99 +org.eclipse.jdt.ui.staticondemandthreshold=99