GH-210: Upgrade test-embedded-kafka to JUnit5 (#211)

* GH-210: Upgrade test-embedded-kafka to JUnit5

Resolves https://github.com/spring-cloud/spring-cloud-stream-samples/issues/210

* Avoid a rebalance by using a different group in the test; use `assign()` instead of `subscribe()`.
This commit is contained in:
Gary Russell
2021-08-26 15:29:13 -04:00
committed by GitHub
parent 99d44e08d2
commit 44dfb314c1
2 changed files with 19 additions and 30 deletions

View File

@@ -12,12 +12,12 @@
<parent> <parent>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId> <artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version> <version>2.4.10</version>
<relativePath/> <!-- lookup parent from repository --> <relativePath/> <!-- lookup parent from repository -->
</parent> </parent>
<properties> <properties>
<spring-cloud.version>2020.0.2</spring-cloud.version> <spring-cloud.version>2020.0.3</spring-cloud.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
@@ -59,11 +59,6 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@@ -34,21 +34,21 @@ package demo;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.BeforeClass; import org.junit.jupiter.api.Test;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
@@ -62,25 +62,19 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Soby Chacko * @author Soby Chacko
* *
*/ */
@RunWith(SpringRunner.class)
@SpringBootTest @SpringBootTest
@EmbeddedKafka(topics = { EmbeddedKafkaApplicationTests.INPUT_TOPIC, EmbeddedKafkaApplicationTests.OUTPUT_TOPIC },
partitions = 1,
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
public class EmbeddedKafkaApplicationTests { public class EmbeddedKafkaApplicationTests {
private static final String INPUT_TOPIC = "testEmbeddedIn"; public static final String INPUT_TOPIC = "testEmbeddedIn";
private static final String OUTPUT_TOPIC = "testEmbeddedOut"; public static final String OUTPUT_TOPIC = "testEmbeddedOut";
private static final String GROUP_NAME = "embeddedKafkaApplication"; private static final String GROUP_NAME = "embeddedKafkaApplicationTest";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, OUTPUT_TOPIC);
@BeforeClass
public static void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
}
@Test @Test
public void testSendReceive() { void testSendReceive(@Autowired EmbeddedKafkaBroker embeddedKafka) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka()); Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put("key.serializer", ByteArraySerializer.class); senderProps.put("key.serializer", ByteArraySerializer.class);
senderProps.put("value.serializer", ByteArraySerializer.class); senderProps.put("value.serializer", ByteArraySerializer.class);
DefaultKafkaProducerFactory<byte[], byte[]> pf = new DefaultKafkaProducerFactory<>(senderProps); DefaultKafkaProducerFactory<byte[], byte[]> pf = new DefaultKafkaProducerFactory<>(senderProps);
@@ -88,15 +82,15 @@ public class EmbeddedKafkaApplicationTests {
template.setDefaultTopic(INPUT_TOPIC); template.setDefaultTopic(INPUT_TOPIC);
template.sendDefault("foo".getBytes()); template.sendDefault("foo".getBytes());
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(GROUP_NAME, "false", embeddedKafka.getEmbeddedKafka()); Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(GROUP_NAME, "false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put("key.deserializer", ByteArrayDeserializer.class); consumerProps.put("key.deserializer", ByteArrayDeserializer.class);
consumerProps.put("value.deserializer", ByteArrayDeserializer.class); consumerProps.put("value.deserializer", ByteArrayDeserializer.class);
DefaultKafkaConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(consumerProps); DefaultKafkaConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<byte[], byte[]> consumer = cf.createConsumer(); Consumer<byte[], byte[]> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton(OUTPUT_TOPIC)); consumer.assign(Collections.singleton(new TopicPartition(OUTPUT_TOPIC, 0)));
ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000); ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(10));
consumer.commitSync(); consumer.commitSync();
assertThat(records.count()).isEqualTo(1); assertThat(records.count()).isEqualTo(1);