From 4fbf90d41571ca11d1d91168c58c83a9d074cd0d Mon Sep 17 00:00:00 2001 From: bum12ark Date: Thu, 17 Mar 2022 11:17:47 +0900 Subject: [PATCH] =?UTF-8?q?feat(notification):=20Kafka=20Consumer=20?= =?UTF-8?q?=EC=84=A4=EC=A0=95=20=EC=A0=95=EB=B3=B4=20=EC=B6=94=EA=B0=80=20?= =?UTF-8?q?=EB=B0=8F=20=EC=A3=BC=EB=AC=B8=20=EC=8B=A0=EC=B2=AD=20=EB=A1=9C?= =?UTF-8?q?=EC=A7=81=20=EA=B5=AC=ED=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Kafka Consumer 설정 클래스 생성 - 고객의 주문이 신청완료 이벤트 수신부 구현 - 주문 신청 완료 시 알림 저장 --- notification-service/build.gradle | 2 +- .../messagequeue/NotificationConsumer.java | 45 ++++++++++++++ .../messagequeue/OrderStatus.java | 5 ++ .../service/NotificationService.java | 1 + .../service/NotificationServiceImpl.java | 11 ++++ .../global/config/KafkaConsumerConfig.java | 58 +++++++++++++++++++ .../global/entity/BaseEntity.java | 1 + 7 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/messagequeue/NotificationConsumer.java create mode 100644 notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/messagequeue/OrderStatus.java create mode 100644 notification-service/src/main/java/com/justpickup/notificationservice/global/config/KafkaConsumerConfig.java diff --git a/notification-service/build.gradle b/notification-service/build.gradle index a037440..a52e095 100644 --- a/notification-service/build.gradle +++ b/notification-service/build.gradle @@ -35,7 +35,7 @@ dependencies { implementation 'org.springframework.cloud:spring-cloud-starter-openfeign' /*implementation 'org.springframework.boot:spring-boot-starter-amqp'*/ implementation 'org.springframework.cloud:spring-cloud-starter-config' - /*implementation 'org.springframework.kafka:spring-kafka'*/ + implementation 'org.springframework.kafka:spring-kafka' // https://mvnrepository.com/artifact/com.github.gavlyukovskiy/p6spy-spring-boot-starter implementation 'com.github.gavlyukovskiy:p6spy-spring-boot-starter:1.8.0' diff --git a/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/messagequeue/NotificationConsumer.java b/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/messagequeue/NotificationConsumer.java new file mode 100644 index 0000000..8de66d7 --- /dev/null +++ b/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/messagequeue/NotificationConsumer.java @@ -0,0 +1,45 @@ +package com.justpickup.notificationservice.domain.notification.messagequeue; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.justpickup.notificationservice.domain.notification.service.NotificationService; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; + +@Component +@Transactional(readOnly = true) +@RequiredArgsConstructor +@Slf4j +public class NotificationConsumer { + + private final ObjectMapper objectMapper; + private final NotificationService notificationService; + + @KafkaListener(topics = "orderApply") + public void applyOrder(String kafkaMessage) throws JsonProcessingException { + log.debug("## NotificationConsumer.applyOrder"); + log.debug("#### kafka Message = {}", kafkaMessage); + + KafkaSendOrderDto kafkaSendOrderDto = objectMapper.readValue(kafkaMessage, KafkaSendOrderDto.class); + + + } + + @Data + static class KafkaSendOrderDto { + private Long id; + private Long userId; + private Long userCouponId; + private Long storeId; + private long orderPrice; + private LocalDateTime orderTime; + private long usedPoint; + private OrderStatus orderStatus; + } +} diff --git a/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/messagequeue/OrderStatus.java b/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/messagequeue/OrderStatus.java new file mode 100644 index 0000000..7832b37 --- /dev/null +++ b/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/messagequeue/OrderStatus.java @@ -0,0 +1,5 @@ +package com.justpickup.notificationservice.domain.notification.messagequeue; + +public enum OrderStatus { + +} diff --git a/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/service/NotificationService.java b/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/service/NotificationService.java index 3d35ecb..2e462e5 100644 --- a/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/service/NotificationService.java +++ b/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/service/NotificationService.java @@ -10,4 +10,5 @@ public interface NotificationService { List findNotificationByUserId(Long id); void updateNotification(UpdateNotificationDto dto); Long findNotificationCounts(Long userId, Yn readYn); + void insertOrderPlaced(Long userId); } diff --git a/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/service/NotificationServiceImpl.java b/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/service/NotificationServiceImpl.java index 5c7d6de..75abd11 100644 --- a/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/service/NotificationServiceImpl.java +++ b/notification-service/src/main/java/com/justpickup/notificationservice/domain/notification/service/NotificationServiceImpl.java @@ -49,4 +49,15 @@ public class NotificationServiceImpl implements NotificationService { public Long findNotificationCounts(Long userId, Yn readYn) { return notificationRepository.countByUserIdAndReadYn(userId, readYn); } + + @Transactional + @Override + public void insertOrderPlaced(Long userId) { + String title = "주문이 수락되었어요."; + String storeName = "[]"; + String message = storeName + "매장의 주문이 수락되었습니다."; + Notification notification = Notification.of(userId, message, title); + + notificationRepository.save(notification); + } } diff --git a/notification-service/src/main/java/com/justpickup/notificationservice/global/config/KafkaConsumerConfig.java b/notification-service/src/main/java/com/justpickup/notificationservice/global/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..2216365 --- /dev/null +++ b/notification-service/src/main/java/com/justpickup/notificationservice/global/config/KafkaConsumerConfig.java @@ -0,0 +1,58 @@ +package com.justpickup.notificationservice.global.config; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.DefaultErrorHandler; + +import java.util.*; + +@EnableKafka +@RequiredArgsConstructor +@Configuration +public class KafkaConsumerConfig { + + @Value("${kafka.host}") + private final String host; + + @Value("${kafka.port}") + private final String port; + + @Bean + public ConsumerFactory consumerFactory() { + Map properties = new HashMap<>(); + + String ipAddress = host + ":" + port; + + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ipAddress); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + return new DefaultKafkaConsumerFactory<>(properties); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory + = new ConcurrentKafkaListenerContainerFactory<>(); + + kafkaListenerContainerFactory.setConsumerFactory(consumerFactory()); + kafkaListenerContainerFactory.setCommonErrorHandler(errorHandler()); + + return kafkaListenerContainerFactory; + } + + @Bean + public DefaultErrorHandler errorHandler() { + DefaultErrorHandler handler = new DefaultErrorHandler(); +// handler.addNotRetryableExceptions(); + return handler; + } +} diff --git a/notification-service/src/main/java/com/justpickup/notificationservice/global/entity/BaseEntity.java b/notification-service/src/main/java/com/justpickup/notificationservice/global/entity/BaseEntity.java index f27ba47..3d4737a 100644 --- a/notification-service/src/main/java/com/justpickup/notificationservice/global/entity/BaseEntity.java +++ b/notification-service/src/main/java/com/justpickup/notificationservice/global/entity/BaseEntity.java @@ -4,6 +4,7 @@ package com.justpickup.notificationservice.global.entity; import lombok.Getter; import org.hibernate.annotations.CreationTimestamp; import org.hibernate.annotations.UpdateTimestamp; +import org.springframework.data.annotation.CreatedBy; import javax.persistence.MappedSuperclass; import java.time.LocalDateTime;