feat(notification): Kafka Consumer 설정 정보 추가 및 주문 신청 로직 구현

- Kafka Consumer 설정 클래스 생성
- 고객의 주문이 신청완료 이벤트 수신부 구현
- 주문 신청 완료 시 알림 저장
This commit is contained in:
bum12ark
2022-03-17 11:17:47 +09:00
parent e6400b1e0c
commit 4fbf90d415
7 changed files with 122 additions and 1 deletions

View File

@@ -35,7 +35,7 @@ dependencies {
implementation 'org.springframework.cloud:spring-cloud-starter-openfeign'
/*implementation 'org.springframework.boot:spring-boot-starter-amqp'*/
implementation 'org.springframework.cloud:spring-cloud-starter-config'
/*implementation 'org.springframework.kafka:spring-kafka'*/
implementation 'org.springframework.kafka:spring-kafka'
// https://mvnrepository.com/artifact/com.github.gavlyukovskiy/p6spy-spring-boot-starter
implementation 'com.github.gavlyukovskiy:p6spy-spring-boot-starter:1.8.0'

View File

@@ -0,0 +1,45 @@
package com.justpickup.notificationservice.domain.notification.messagequeue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.justpickup.notificationservice.domain.notification.service.NotificationService;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
@Component
@Transactional(readOnly = true)
@RequiredArgsConstructor
@Slf4j
public class NotificationConsumer {
private final ObjectMapper objectMapper;
private final NotificationService notificationService;
@KafkaListener(topics = "orderApply")
public void applyOrder(String kafkaMessage) throws JsonProcessingException {
log.debug("## NotificationConsumer.applyOrder");
log.debug("#### kafka Message = {}", kafkaMessage);
KafkaSendOrderDto kafkaSendOrderDto = objectMapper.readValue(kafkaMessage, KafkaSendOrderDto.class);
}
@Data
static class KafkaSendOrderDto {
private Long id;
private Long userId;
private Long userCouponId;
private Long storeId;
private long orderPrice;
private LocalDateTime orderTime;
private long usedPoint;
private OrderStatus orderStatus;
}
}

View File

@@ -0,0 +1,5 @@
package com.justpickup.notificationservice.domain.notification.messagequeue;
public enum OrderStatus {
}

View File

@@ -10,4 +10,5 @@ public interface NotificationService {
List<FindNotificationDto> findNotificationByUserId(Long id);
void updateNotification(UpdateNotificationDto dto);
Long findNotificationCounts(Long userId, Yn readYn);
void insertOrderPlaced(Long userId);
}

View File

@@ -49,4 +49,15 @@ public class NotificationServiceImpl implements NotificationService {
public Long findNotificationCounts(Long userId, Yn readYn) {
return notificationRepository.countByUserIdAndReadYn(userId, readYn);
}
@Transactional
@Override
public void insertOrderPlaced(Long userId) {
String title = "주문이 수락되었어요.";
String storeName = "[]";
String message = storeName + "매장의 주문이 수락되었습니다.";
Notification notification = Notification.of(userId, message, title);
notificationRepository.save(notification);
}
}

View File

@@ -0,0 +1,58 @@
package com.justpickup.notificationservice.global.config;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.DefaultErrorHandler;
import java.util.*;
@EnableKafka
@RequiredArgsConstructor
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.host}")
private final String host;
@Value("${kafka.port}")
private final String port;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
String ipAddress = host + ":" + port;
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ipAddress);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
kafkaListenerContainerFactory.setCommonErrorHandler(errorHandler());
return kafkaListenerContainerFactory;
}
@Bean
public DefaultErrorHandler errorHandler() {
DefaultErrorHandler handler = new DefaultErrorHandler();
// handler.addNotRetryableExceptions();
return handler;
}
}

View File

@@ -4,6 +4,7 @@ package com.justpickup.notificationservice.global.entity;
import lombok.Getter;
import org.hibernate.annotations.CreationTimestamp;
import org.hibernate.annotations.UpdateTimestamp;
import org.springframework.data.annotation.CreatedBy;
import javax.persistence.MappedSuperclass;
import java.time.LocalDateTime;