From 670a02bf3cd603c6a30ba5511d1e463abbf6d67c Mon Sep 17 00:00:00 2001 From: haerong22 Date: Wed, 21 Dec 2022 19:56:34 +0900 Subject: [PATCH] #25 catalog-service: kafka consumer --- springcloud/catalog-service/build.gradle | 3 ++ .../messagequeue/KafkaConsumer.java | 44 +++++++++++++++++++ .../messagequeue/KafkaConsumerConfig.java | 40 +++++++++++++++++ .../src/main/resources/application.yml | 3 +- 4 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 springcloud/catalog-service/src/main/java/com/example/catalogservice/messagequeue/KafkaConsumer.java create mode 100644 springcloud/catalog-service/src/main/java/com/example/catalogservice/messagequeue/KafkaConsumerConfig.java diff --git a/springcloud/catalog-service/build.gradle b/springcloud/catalog-service/build.gradle index 386d67ca..857b185a 100644 --- a/springcloud/catalog-service/build.gradle +++ b/springcloud/catalog-service/build.gradle @@ -26,6 +26,9 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-data-jpa' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client' + + implementation 'org.springframework.kafka:spring-kafka' + compileOnly 'org.projectlombok:lombok' developmentOnly 'org.springframework.boot:spring-boot-devtools' runtimeOnly group: 'com.h2database', name: 'h2', version: '1.3.176' diff --git a/springcloud/catalog-service/src/main/java/com/example/catalogservice/messagequeue/KafkaConsumer.java b/springcloud/catalog-service/src/main/java/com/example/catalogservice/messagequeue/KafkaConsumer.java new file mode 100644 index 00000000..a905bab2 --- /dev/null +++ b/springcloud/catalog-service/src/main/java/com/example/catalogservice/messagequeue/KafkaConsumer.java @@ -0,0 +1,44 @@ +package com.example.catalogservice.messagequeue; + +import com.example.catalogservice.entity.CatalogEntity; +import com.example.catalogservice.repository.CatalogRepository; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +import java.util.HashMap; + +@Slf4j +@Service +@RequiredArgsConstructor +public class KafkaConsumer { + + private final CatalogRepository catalogRepository; + private final ObjectMapper objectMapper; + + @KafkaListener(topics = "example-catalog-topic") + public void updateQty(String kafkaMessage) { + log.info("Kafka Message: -> " + kafkaMessage); + + HashMap map = new HashMap<>(); + + try { + map = objectMapper.readValue(kafkaMessage, new TypeReference>() {}); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + + String productId = (String) map.get("productId"); + + CatalogEntity catalogEntity = catalogRepository.findByProductId(productId) + .orElseThrow(RuntimeException::new); + + catalogEntity.setStock(catalogEntity.getStock() - (Integer) map.get("qty")); + + catalogRepository.save(catalogEntity); + } +} diff --git a/springcloud/catalog-service/src/main/java/com/example/catalogservice/messagequeue/KafkaConsumerConfig.java b/springcloud/catalog-service/src/main/java/com/example/catalogservice/messagequeue/KafkaConsumerConfig.java new file mode 100644 index 00000000..2858b2bb --- /dev/null +++ b/springcloud/catalog-service/src/main/java/com/example/catalogservice/messagequeue/KafkaConsumerConfig.java @@ -0,0 +1,40 @@ +package com.example.catalogservice.messagequeue; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +import java.util.HashMap; +import java.util.Map; + +@EnableKafka +@Configuration +public class KafkaConsumerConfig { + + @Bean + public ConsumerFactory consumerFactory() { + Map properties = new HashMap<>(); + + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + return new DefaultKafkaConsumerFactory<>(properties ); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory + = new ConcurrentKafkaListenerContainerFactory<>(); + + kafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); + + return kafkaListenerContainerFactory; + } +} diff --git a/springcloud/catalog-service/src/main/resources/application.yml b/springcloud/catalog-service/src/main/resources/application.yml index 93f4fc94..60299540 100644 --- a/springcloud/catalog-service/src/main/resources/application.yml +++ b/springcloud/catalog-service/src/main/resources/application.yml @@ -25,9 +25,10 @@ spring: eureka: instance: + hostname: localhost instance-id: ${spring.application.name}:${spring.application.instance_id:${random.value}} client: register-with-eureka: true fetch-registry: true service-url: - defaultZone: http://127.0.0.1:8761/eureka #?? ?? \ No newline at end of file + defaultZone: http://127.0.0.1:8761/eureka #?? ??