diff --git a/testing-samples/test-embedded-kafka/pom.xml b/testing-samples/test-embedded-kafka/pom.xml index f25c7ba..6142acc 100644 --- a/testing-samples/test-embedded-kafka/pom.xml +++ b/testing-samples/test-embedded-kafka/pom.xml @@ -12,12 +12,12 @@ org.springframework.boot spring-boot-starter-parent - 2.4.4 + 2.4.10 - 2020.0.2 + 2020.0.3 @@ -59,11 +59,6 @@ org.springframework.boot spring-boot-starter-web - - org.junit.vintage - junit-vintage-engine - test - diff --git a/testing-samples/test-embedded-kafka/src/test/java/demo/EmbeddedKafkaApplicationTests.java b/testing-samples/test-embedded-kafka/src/test/java/demo/EmbeddedKafkaApplicationTests.java index 9d7d68c..f5b1ec1 100644 --- a/testing-samples/test-embedded-kafka/src/test/java/demo/EmbeddedKafkaApplicationTests.java +++ b/testing-samples/test-embedded-kafka/src/test/java/demo/EmbeddedKafkaApplicationTests.java @@ -34,21 +34,21 @@ package demo; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.test.rule.EmbeddedKafkaRule; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; -import org.springframework.test.context.junit4.SpringRunner; +import java.time.Duration; import java.util.Collections; import java.util.Map; @@ -62,25 +62,19 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Soby Chacko * */ -@RunWith(SpringRunner.class) @SpringBootTest +@EmbeddedKafka(topics = { EmbeddedKafkaApplicationTests.INPUT_TOPIC, EmbeddedKafkaApplicationTests.OUTPUT_TOPIC }, + partitions = 1, + bootstrapServersProperty = "spring.kafka.bootstrap-servers") public class EmbeddedKafkaApplicationTests { - private static final String INPUT_TOPIC = "testEmbeddedIn"; - private static final String OUTPUT_TOPIC = "testEmbeddedOut"; - private static final String GROUP_NAME = "embeddedKafkaApplication"; - - @ClassRule - public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, OUTPUT_TOPIC); - - @BeforeClass - public static void setup() { - System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString()); - } + public static final String INPUT_TOPIC = "testEmbeddedIn"; + public static final String OUTPUT_TOPIC = "testEmbeddedOut"; + private static final String GROUP_NAME = "embeddedKafkaApplicationTest"; @Test - public void testSendReceive() { - Map senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka()); + void testSendReceive(@Autowired EmbeddedKafkaBroker embeddedKafka) { + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); senderProps.put("key.serializer", ByteArraySerializer.class); senderProps.put("value.serializer", ByteArraySerializer.class); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); @@ -88,15 +82,15 @@ public class EmbeddedKafkaApplicationTests { template.setDefaultTopic(INPUT_TOPIC); template.sendDefault("foo".getBytes()); - Map consumerProps = KafkaTestUtils.consumerProps(GROUP_NAME, "false", embeddedKafka.getEmbeddedKafka()); + Map consumerProps = KafkaTestUtils.consumerProps(GROUP_NAME, "false", embeddedKafka); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put("key.deserializer", ByteArrayDeserializer.class); consumerProps.put("value.deserializer", ByteArrayDeserializer.class); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); Consumer consumer = cf.createConsumer(); - consumer.subscribe(Collections.singleton(OUTPUT_TOPIC)); - ConsumerRecords records = consumer.poll(10_000); + consumer.assign(Collections.singleton(new TopicPartition(OUTPUT_TOPIC, 0))); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); consumer.commitSync(); assertThat(records.count()).isEqualTo(1);