From e23e9e3aca422ff795ccbfd2d9178dc2601fa81d Mon Sep 17 00:00:00 2001 From: Gaetano Piazzolla Date: Mon, 15 May 2023 21:16:21 +0200 Subject: [PATCH] BAEL-6493 - Read last N messages from a Kafka Topic (#14020) --- .../kafka/KafaConsumeLastNMessages.java | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessages.java diff --git a/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessages.java b/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessages.java new file mode 100644 index 0000000000..94f5907525 --- /dev/null +++ b/apache-kafka-2/src/test/java/com/baeldung/kafka/KafaConsumeLastNMessages.java @@ -0,0 +1,138 @@ +package com.baeldung.kafka; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Testcontainers +public class KafaConsumeLastNMessages { + + private static String TOPIC1 = "baeldung-github"; + private static String TOPIC2 = "baeldung-blog"; + private static String MESSAGE_KEY = "message"; + private static KafkaProducer producer; + private static KafkaConsumer consumer; + private static KafkaProducer transactionalProducer; + + @Container + private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")); + + @BeforeAll + static void setup() { + KAFKA_CONTAINER.addExposedPort(9092); + + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + Properties consumerProperties = new Properties(); + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1"); + + Properties transactionalProducerProprieties = new Properties(); + transactionalProducerProprieties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + transactionalProducerProprieties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + transactionalProducerProprieties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + transactionalProducerProprieties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); + transactionalProducerProprieties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-0"); + + producer = new KafkaProducer<>(producerProperties); + consumer = new KafkaConsumer<>(consumerProperties); + transactionalProducer = new KafkaProducer<>(transactionalProducerProprieties); + } + + @AfterAll + static void destroy() { + KAFKA_CONTAINER.stop(); + } + + @Test + void whenSeekingKafkaTopicCursorToEnd_consumerRetrievesOnlyDesiredNumberOfMessages() throws ExecutionException, InterruptedException { + int messagesInTopic = 100; + int messagesToRetrieve = 20; + + for (int i = 0; i < messagesInTopic; i++) { + producer.send(new ProducerRecord<>(TOPIC1, null, MESSAGE_KEY, String.valueOf(i))) + .get(); + } + + TopicPartition partition = new TopicPartition(TOPIC1, 0); + List partitions = new ArrayList<>(); + partitions.add(partition); + consumer.assign(partitions); + + consumer.seekToEnd(partitions); + long startIndex = consumer.position(partition) - messagesToRetrieve; + consumer.seek(partition, startIndex); + + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + int recordsReceived = 0; + for (ConsumerRecord record : records) { + assertEquals(MESSAGE_KEY, record.key()); + assertTrue(Integer.parseInt(record.value()) >= (messagesInTopic - messagesToRetrieve)); + recordsReceived++; + } + + assertEquals(messagesToRetrieve, recordsReceived); + } + + @Test + void havingTransactionalProducer_whenSeekingKafkaTopicCursorToEnd_consumerRetrievesLessMessages() throws ExecutionException, InterruptedException { + int messagesInTopic = 100; + int messagesToRetrieve = 20; + + transactionalProducer.initTransactions(); + for (int i = 0; i < messagesInTopic; i++) { + transactionalProducer.beginTransaction(); + transactionalProducer.send(new ProducerRecord<>(TOPIC2, null, MESSAGE_KEY, String.valueOf(i))) + .get(); + transactionalProducer.commitTransaction(); + } + + TopicPartition partition = new TopicPartition(TOPIC2, 0); + List partitions = new ArrayList<>(); + partitions.add(partition); + consumer.assign(partitions); + + consumer.seekToEnd(partitions); + long startIndex = consumer.position(partition) - messagesToRetrieve; + consumer.seek(partition, startIndex); + + ConsumerRecords records = consumer.poll(Duration.ofMinutes(1)); + int recordsReceived = 0; + for (ConsumerRecord record : records) { + assertEquals(MESSAGE_KEY, record.key()); + assertTrue(Integer.parseInt(record.value()) >= (messagesInTopic - messagesToRetrieve)); + recordsReceived++; + } + + assertTrue(messagesToRetrieve > recordsReceived); + } + +}