#25 catalog-service: kafka consumer

This commit is contained in:
haerong22
2022-12-21 19:56:34 +09:00
parent ab0a48f770
commit 670a02bf3c
4 changed files with 89 additions and 1 deletions

View File

@@ -26,6 +26,9 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
runtimeOnly group: 'com.h2database', name: 'h2', version: '1.3.176'

View File

@@ -0,0 +1,44 @@
package com.example.catalogservice.messagequeue;
import com.example.catalogservice.entity.CatalogEntity;
import com.example.catalogservice.repository.CatalogRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.HashMap;
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaConsumer {
private final CatalogRepository catalogRepository;
private final ObjectMapper objectMapper;
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka Message: -> " + kafkaMessage);
HashMap<Object, Object> map = new HashMap<>();
try {
map = objectMapper.readValue(kafkaMessage, new TypeReference<HashMap<Object, Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
String productId = (String) map.get("productId");
CatalogEntity catalogEntity = catalogRepository.findByProductId(productId)
.orElseThrow(RuntimeException::new);
catalogEntity.setStock(catalogEntity.getStock() - (Integer) map.get("qty"));
catalogRepository.save(catalogEntity);
}
}

View File

@@ -0,0 +1,40 @@
package com.example.catalogservice.messagequeue;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties );
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}

View File

@@ -25,9 +25,10 @@ spring:
eureka:
instance:
hostname: localhost
instance-id: ${spring.application.name}:${spring.application.instance_id:${random.value}}
client:
register-with-eureka: true
fetch-registry: true
service-url:
defaultZone: http://127.0.0.1:8761/eureka #?? ??
defaultZone: http://127.0.0.1:8761/eureka #?? ??