Fixed tests related to GH-1527 change in core
This commit is contained in:
committed by
Soby Chacko
parent
999740597a
commit
87e1b35d55
@@ -315,10 +315,10 @@ public class KafkaBinderTests extends
|
||||
|
||||
moduleOutputChannel.send(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
|
||||
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
|
||||
moduleInputChannel.subscribe(message1 -> {
|
||||
try {
|
||||
inboundMessageRef.set((Message<byte[]>) message1);
|
||||
inboundMessageRef.set((Message<String>) message1);
|
||||
}
|
||||
finally {
|
||||
latch.countDown();
|
||||
@@ -328,7 +328,7 @@ public class KafkaBinderTests extends
|
||||
|
||||
|
||||
Assertions.assertThat(inboundMessageRef.get()).isNotNull();
|
||||
Assertions.assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo");
|
||||
Assertions.assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo");
|
||||
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull();
|
||||
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
|
||||
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
|
||||
@@ -366,10 +366,10 @@ public class KafkaBinderTests extends
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build();
|
||||
moduleOutputChannel.send(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
|
||||
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
|
||||
moduleInputChannel.subscribe(message1 -> {
|
||||
try {
|
||||
inboundMessageRef.set((Message<byte[]>) message1);
|
||||
inboundMessageRef.set((Message<String>) message1);
|
||||
}
|
||||
finally {
|
||||
latch.countDown();
|
||||
@@ -378,7 +378,7 @@ public class KafkaBinderTests extends
|
||||
Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");
|
||||
|
||||
assertThat(inboundMessageRef.get()).isNotNull();
|
||||
assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo");
|
||||
assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo");
|
||||
assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
|
||||
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
|
||||
producerBinding.unbind();
|
||||
@@ -424,6 +424,13 @@ public class KafkaBinderTests extends
|
||||
assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo");
|
||||
assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
|
||||
.isEqualTo(MimeTypeUtils.APPLICATION_OCTET_STREAM);
|
||||
|
||||
Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = ((KafkaTestBinder)binder).getCoreBinder().getTopicsInUse();
|
||||
assertThat(topicsInUse.keySet()).contains("foo.bar");
|
||||
KafkaMessageChannelBinder.TopicInformation topic = topicsInUse.get("foo.bar");
|
||||
assertThat(topic.isConsumerTopic()).isTrue();
|
||||
assertThat(topic.getConsumerGroup()).isEqualTo("testSendAndReceive");
|
||||
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
@@ -1406,6 +1413,12 @@ public class KafkaBinderTests extends
|
||||
assertThat(receivedMessage2).isNotNull();
|
||||
assertThat(new String(receivedMessage2.getPayload(), StandardCharsets.UTF_8)).isEqualTo(testPayload3);
|
||||
|
||||
Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = ((KafkaTestBinder)binder).getCoreBinder().getTopicsInUse();
|
||||
assertThat(topicsInUse.keySet()).contains("defaultGroup.0");
|
||||
KafkaMessageChannelBinder.TopicInformation topic = topicsInUse.get("defaultGroup.0");
|
||||
assertThat(topic.isConsumerTopic()).isTrue();
|
||||
assertThat(topic.getConsumerGroup()).startsWith("anonymous");
|
||||
|
||||
producerBinding.unbind();
|
||||
binding1.unbind();
|
||||
binding2.unbind();
|
||||
@@ -2328,10 +2341,10 @@ public class KafkaBinderTests extends
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
|
||||
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
|
||||
moduleInputChannel.subscribe(message1 -> {
|
||||
try {
|
||||
inboundMessageRef.set((Message<byte[]>) message1);
|
||||
inboundMessageRef.set((Message<String>) message1);
|
||||
}
|
||||
finally {
|
||||
latch.countDown();
|
||||
@@ -2340,7 +2353,7 @@ public class KafkaBinderTests extends
|
||||
Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");
|
||||
|
||||
assertThat(inboundMessageRef.get()).isNotNull();
|
||||
assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("test");
|
||||
assertThat(inboundMessageRef.get().getPayload()).isEqualTo("test");
|
||||
assertThat(inboundMessageRef.get().getHeaders()).containsEntry("contentType", MimeTypeUtils.TEXT_PLAIN);
|
||||
}
|
||||
finally {
|
||||
|
||||
Reference in New Issue
Block a user