diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java index 8c43622980..eebcf778be 100644 --- a/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java +++ b/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java @@ -39,8 +39,9 @@ class EmbeddedKafkaIntegrationTest { } @Test - public void givenEmbeddedKafkaBroker_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception { + public void givenEmbeddedKafkaBroker_whenSendingWithDefaultTemplate_thenMessageReceived() throws Exception { String data = "Sending with default template"; + template.send(topic, data); boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS); @@ -49,8 +50,9 @@ class EmbeddedKafkaIntegrationTest { } @Test - public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { + public void givenEmbeddedKafkaBroker_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception { String data = "Sending with our own simple KafkaProducer"; + producer.send(topic, data); boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS); diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/testcontainers/KafkaTestContainersLiveTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/testcontainers/KafkaTestContainersLiveTest.java index 74d6f824b1..a111c4a5bc 100644 --- a/spring-kafka/src/test/java/com/baeldung/kafka/testcontainers/KafkaTestContainersLiveTest.java +++ b/spring-kafka/src/test/java/com/baeldung/kafka/testcontainers/KafkaTestContainersLiveTest.java @@ -3,6 +3,7 @@ package com.baeldung.kafka.testcontainers; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.HashMap; import java.util.Map; @@ -12,6 +13,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; @@ -37,9 +39,9 @@ import com.baeldung.kafka.embedded.KafkaProducer; import com.baeldung.kafka.embedded.KafkaProducerConsumerApplication; /** - * This test class uses Testcontainers to instantiate and manage an external Apache + * This test class uses Testcontainers to instantiate and manage an external Apache * Kafka broker hosted inside a Docker container. - * + * * Therefore, one of the prerequisites for using Testcontainers is that Docker is installed on the host running this test * */ @@ -64,22 +66,31 @@ public class KafkaTestContainersLiveTest { @Value("${test.topic}") private String topic; - @Test - public void givenKafkaDockerContainer_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception { - template.send(topic, "Sending with default template"); - consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); - - assertThat(consumer.getLatch().getCount(), equalTo(0L)); - assertThat(consumer.getPayload(), containsString("embedded-test-topic")); + @Before + public void setup() { + consumer.resetLatch(); } @Test - public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { - producer.send(topic, "Sending with own controller"); - consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); - - assertThat(consumer.getLatch().getCount(), equalTo(0L)); - assertThat(consumer.getPayload(), containsString("embedded-test-topic")); + public void givenKafkaDockerContainer_whenSendingWithDefaultTemplate_thenMessageReceived() throws Exception { + String data = "Sending with default template"; + + template.send(topic, data); + + boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS); + assertTrue(messageConsumed); + assertThat(consumer.getPayload(), containsString(data)); + } + + @Test + public void givenKafkaDockerContainer_whenSendingWithSimpleProducer_thenMessageReceived() throws Exception { + String data = "Sending with our own simple KafkaProducer"; + + producer.send(topic, data); + + boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS); + assertTrue(messageConsumed); + assertThat(consumer.getPayload(), containsString(data)); } @TestConfiguration