From e2df834665365c2757ad5c8bdd1b8db079e47da4 Mon Sep 17 00:00:00 2001 From: Pedro Lopes Date: Tue, 1 Aug 2023 00:01:26 -0300 Subject: [PATCH] BAEL-6406 - Understanding Kafka Topics and Partitions (#14324) * consumer config. topic config. driver and calculator classes. * basic app working. test structure * final version * wraping up * optimizing imports --- .../KafkaConsumerConfig.java | 39 +++++++++++++++++++ .../KafkaProducerConfig.java | 35 +++++++++++++++++ .../topicsandpartitions/KafkaTopicConfig.java | 30 ++++++++++++++ .../TemperatureConsumer.java | 36 +++++++++++++++++ .../ThermostatApplicationKafkaApp.java | 13 +++++++ .../ThermostatService.java | 24 ++++++++++++ ...fkaTopicsAndPartitionsIntegrationTest.java | 30 ++++++++++++++ 7 files changed, 207 insertions(+) create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaConsumerConfig.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaProducerConfig.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicConfig.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/TemperatureConsumer.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/ThermostatApplicationKafkaApp.java create mode 100644 spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/ThermostatService.java create mode 100644 spring-kafka-2/src/test/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicsAndPartitionsIntegrationTest.java diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaConsumerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaConsumerConfig.java new file mode 100644 index 0000000000..d48185dd0d --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaConsumerConfig.java @@ -0,0 +1,39 @@ +package com.baeldung.spring.kafka.topicsandpartitions; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +import java.util.HashMap; +import java.util.Map; + +@EnableKafka +@Configuration +public class KafkaConsumerConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapAddress; + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DoubleDeserializer.class); + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaProducerConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaProducerConfig.java new file mode 100644 index 0000000000..9ee6b12e0e --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaProducerConfig.java @@ -0,0 +1,35 @@ +package com.baeldung.spring.kafka.topicsandpartitions; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.DoubleSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaProducerConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapAddress; + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicConfig.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicConfig.java new file mode 100644 index 0000000000..2cfc5a1b37 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicConfig.java @@ -0,0 +1,30 @@ +package com.baeldung.spring.kafka.topicsandpartitions; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.KafkaAdmin; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +public class KafkaTopicConfig { + + @Value(value = "${spring.kafka.bootstrap-servers}") + private String bootstrapAddress; + + public KafkaAdmin kafkaAdmin() { + Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); + return new KafkaAdmin(configs); + } + + public NewTopic celciusTopic() { + return TopicBuilder.name("celcius-scale-topic") + .partitions(2) + .build(); + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/TemperatureConsumer.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/TemperatureConsumer.java new file mode 100644 index 0000000000..7cfbdd5fb0 --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/TemperatureConsumer.java @@ -0,0 +1,36 @@ +package com.baeldung.spring.kafka.topicsandpartitions; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +@Service +public class TemperatureConsumer { + + private CountDownLatch latch = new CountDownLatch(1); + + Map> consumedRecords = new ConcurrentHashMap<>(); + + @KafkaListener(topics = "celcius-scale-topic", groupId = "group-1") + public void consumer1(ConsumerRecord consumerRecord) { + computeConsumedRecord("consumer-1", consumerRecord.partition()); + } + + private void computeConsumedRecord(String key, int consumerRecord) { + consumedRecords.computeIfAbsent(key, k -> new HashSet<>()); + consumedRecords.computeIfPresent(key, (k, v) -> { + v.add(String.valueOf(consumerRecord)); + return v; + }); + } + + public CountDownLatch getLatch() { + return latch; + } +} diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/ThermostatApplicationKafkaApp.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/ThermostatApplicationKafkaApp.java new file mode 100644 index 0000000000..c8b7964bbd --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/ThermostatApplicationKafkaApp.java @@ -0,0 +1,13 @@ +package com.baeldung.spring.kafka.topicsandpartitions; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Import; + +@SpringBootApplication +@Import(value = {KafkaTopicConfig.class, KafkaProducerConfig.class, KafkaConsumerConfig.class}) +public class ThermostatApplicationKafkaApp { + public static void main(String[] args) { + SpringApplication.run(ThermostatApplicationKafkaApp.class, args); + } +} \ No newline at end of file diff --git a/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/ThermostatService.java b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/ThermostatService.java new file mode 100644 index 0000000000..fc4a64961f --- /dev/null +++ b/spring-kafka-2/src/main/java/com/baeldung/spring/kafka/topicsandpartitions/ThermostatService.java @@ -0,0 +1,24 @@ +package com.baeldung.spring.kafka.topicsandpartitions; + +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +import java.util.Random; + +@Service +public class ThermostatService { + + private final KafkaTemplate kafkaTemplate; + + public ThermostatService(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + public void measureCelsiusAndPublish(int numMeasurements) { + new Random().doubles(25, 35) + .limit(numMeasurements) + .forEach(tmp -> { + kafkaTemplate.send("celcius-scale-topic", tmp); + }); + } +} diff --git a/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicsAndPartitionsIntegrationTest.java b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicsAndPartitionsIntegrationTest.java new file mode 100644 index 0000000000..309c87125a --- /dev/null +++ b/spring-kafka-2/src/test/java/com/baeldung/spring/kafka/topicsandpartitions/KafkaTopicsAndPartitionsIntegrationTest.java @@ -0,0 +1,30 @@ +package com.baeldung.spring.kafka.topicsandpartitions; + +import org.junit.ClassRule; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import java.util.concurrent.TimeUnit; + +@SpringBootTest(classes = ThermostatApplicationKafkaApp.class) +@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}) +public class KafkaTopicsAndPartitionsIntegrationTest { + @ClassRule + public static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, true, "multitype"); + + @Autowired + private ThermostatService service; + + @Autowired + private TemperatureConsumer consumer; + + @Test + public void givenTopic_andConsumerGroup_whenConsumersListenToEvents_thenConsumeItCorrectly() throws Exception { + service.measureCelsiusAndPublish(10000); + consumer.getLatch().await(1, TimeUnit.SECONDS); + System.out.println(consumer.consumedRecords); + } +}