diff --git a/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaConsumer.java b/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaConsumer.java index 48a194b4e3..796bab7d32 100644 --- a/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaConsumer.java +++ b/spring-kafka/src/main/java/com/baeldung/kafka/embedded/KafkaConsumer.java @@ -15,12 +15,14 @@ public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); private CountDownLatch latch = new CountDownLatch(1); - private String payload = null; + + private String payload; @KafkaListener(topics = "${test.topic}") public void receive(ConsumerRecord consumerRecord) { LOGGER.info("received payload='{}'", consumerRecord.toString()); - setPayload(consumerRecord.toString()); + + payload = consumerRecord.toString(); latch.countDown(); } @@ -28,12 +30,12 @@ public class KafkaConsumer { return latch; } + public void resetLatch() { + latch = new CountDownLatch(1); + } + public String getPayload() { return payload; } - private void setPayload(String payload) { - this.payload = payload; - } - } diff --git a/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java b/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java index 4c727795c4..8c43622980 100644 --- a/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java +++ b/spring-kafka/src/test/java/com/baeldung/kafka/embedded/EmbeddedKafkaIntegrationTest.java @@ -3,9 +3,11 @@ package com.baeldung.kafka.embedded; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -31,22 +33,29 @@ class EmbeddedKafkaIntegrationTest { @Value("${test.topic}") private String topic; + @BeforeEach + void setup() { + consumer.resetLatch(); + } + @Test public void givenEmbeddedKafkaBroker_whenSendingtoDefaultTemplate_thenMessageReceived() throws Exception { - template.send(topic, "Sending with default template"); - consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); - assertThat(consumer.getLatch().getCount(), equalTo(0L)); - - assertThat(consumer.getPayload(), containsString("embedded-test-topic")); + String data = "Sending with default template"; + template.send(topic, data); + + boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS); + assertTrue(messageConsumed); + assertThat(consumer.getPayload(), containsString(data)); } @Test public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { - producer.send(topic, "Sending with our own simple KafkaProducer"); - consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); - - assertThat(consumer.getLatch().getCount(), equalTo(0L)); - assertThat(consumer.getPayload(), containsString("embedded-test-topic")); + String data = "Sending with our own simple KafkaProducer"; + producer.send(topic, data); + + boolean messageConsumed = consumer.getLatch().await(10, TimeUnit.SECONDS); + assertTrue(messageConsumed); + assertThat(consumer.getPayload(), containsString(data)); } }