Merge pull request #53 from Development-team-1/알림_주문신청
알림 주문신청 (카프카 컨슈머, 페인클라이언트)
This commit is contained in:
@@ -35,7 +35,7 @@ dependencies {
|
||||
implementation 'org.springframework.cloud:spring-cloud-starter-openfeign'
|
||||
/*implementation 'org.springframework.boot:spring-boot-starter-amqp'*/
|
||||
implementation 'org.springframework.cloud:spring-cloud-starter-config'
|
||||
/*implementation 'org.springframework.kafka:spring-kafka'*/
|
||||
implementation 'org.springframework.kafka:spring-kafka'
|
||||
// https://mvnrepository.com/artifact/com.github.gavlyukovskiy/p6spy-spring-boot-starter
|
||||
implementation 'com.github.gavlyukovskiy:p6spy-spring-boot-starter:1.8.0'
|
||||
|
||||
|
||||
@@ -3,9 +3,11 @@ package com.justpickup.notificationservice;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
|
||||
import org.springframework.cloud.openfeign.EnableFeignClients;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableEurekaClient
|
||||
@EnableFeignClients
|
||||
@SpringBootApplication
|
||||
public class NotificationServiceApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
package com.justpickup.notificationservice.domain.notification.messagequeue;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.justpickup.notificationservice.domain.notification.service.NotificationService;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Component
|
||||
@Transactional(readOnly = true)
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class NotificationConsumer {
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final NotificationService notificationService;
|
||||
|
||||
@KafkaListener(topics = "orderPlaced")
|
||||
public void orderPlaced(String kafkaMessage) throws JsonProcessingException {
|
||||
log.debug("## NotificationConsumer.orderPlaced");
|
||||
log.debug("#### kafka Message = {}", kafkaMessage);
|
||||
|
||||
KafkaSendOrderDto kafkaSendOrderDto = objectMapper.readValue(kafkaMessage, KafkaSendOrderDto.class);
|
||||
|
||||
notificationService.insertOrderPlaced(kafkaSendOrderDto.getUserId(), kafkaSendOrderDto.getStoreId());
|
||||
}
|
||||
|
||||
@Data @NoArgsConstructor
|
||||
static class KafkaSendOrderDto {
|
||||
private Long id;
|
||||
private Long userId;
|
||||
private Long userCouponId;
|
||||
private Long storeId;
|
||||
private long orderPrice;
|
||||
private LocalDateTime orderTime;
|
||||
private long usedPoint;
|
||||
private OrderStatus orderStatus;
|
||||
|
||||
@Builder
|
||||
public KafkaSendOrderDto(Long id, Long userId, Long userCouponId, Long storeId,
|
||||
long orderPrice, LocalDateTime orderTime, long usedPoint, OrderStatus orderStatus) {
|
||||
this.id = id;
|
||||
this.userId = userId;
|
||||
this.userCouponId = userCouponId;
|
||||
this.storeId = storeId;
|
||||
this.orderPrice = orderPrice;
|
||||
this.orderTime = orderTime;
|
||||
this.usedPoint = usedPoint;
|
||||
this.orderStatus = orderStatus;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.justpickup.notificationservice.domain.notification.messagequeue;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
// 주문 대기 -> 주문 신청 --> 주문수락 -> 픽업대기 -> 픽업완료
|
||||
// \
|
||||
// ㄴ> 주문거절
|
||||
@Getter
|
||||
public enum OrderStatus {
|
||||
PENDING("주문대기(장바구니)"),
|
||||
PLACED("주문신청"),
|
||||
ACCEPTED("주문수락"),
|
||||
REJECTED("주문거절"),
|
||||
WAITING("픽업대기"),
|
||||
FINISHED("픽업완료");
|
||||
|
||||
private String message;
|
||||
|
||||
OrderStatus(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
}
|
||||
@@ -10,4 +10,5 @@ public interface NotificationService {
|
||||
List<FindNotificationDto> findNotificationByUserId(Long id);
|
||||
void updateNotification(UpdateNotificationDto dto);
|
||||
Long findNotificationCounts(Long userId, Yn readYn);
|
||||
void insertOrderPlaced(Long userId, Long storeId);
|
||||
}
|
||||
|
||||
@@ -5,6 +5,8 @@ import com.justpickup.notificationservice.domain.notification.dto.UpdateNotifica
|
||||
import com.justpickup.notificationservice.domain.notification.entity.Notification;
|
||||
import com.justpickup.notificationservice.domain.notification.exception.NotExistNotification;
|
||||
import com.justpickup.notificationservice.domain.notification.repository.NotificationRepository;
|
||||
import com.justpickup.notificationservice.global.client.store.GetStoreResponse;
|
||||
import com.justpickup.notificationservice.global.client.store.StoreClient;
|
||||
import com.justpickup.notificationservice.global.dto.Yn;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.data.domain.Sort;
|
||||
@@ -21,6 +23,7 @@ import java.util.stream.Collectors;
|
||||
public class NotificationServiceImpl implements NotificationService {
|
||||
|
||||
private final NotificationRepository notificationRepository;
|
||||
private final StoreClient storeClient;
|
||||
|
||||
@Override
|
||||
public List<FindNotificationDto> findNotificationByUserId(Long userId) {
|
||||
@@ -49,4 +52,16 @@ public class NotificationServiceImpl implements NotificationService {
|
||||
public Long findNotificationCounts(Long userId, Yn readYn) {
|
||||
return notificationRepository.countByUserIdAndReadYn(userId, readYn);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
@Override
|
||||
public void insertOrderPlaced(Long userId, Long storeId) {
|
||||
GetStoreResponse storeResponse = storeClient.getStore(String.valueOf(storeId)).getData();
|
||||
|
||||
String title = "주문이 신청되었어요.";
|
||||
String storeName = "[" + storeResponse.getName() + "]";
|
||||
String message = storeName + "매장의 주문이 신청되었습니다.";
|
||||
Notification notification = Notification.of(userId, message, title);
|
||||
notificationRepository.save(notification);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
package com.justpickup.notificationservice.global.client.store;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
@Getter
|
||||
public class GetStoreResponse {
|
||||
private Long id;
|
||||
private String name;
|
||||
private String phoneNumber;
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.justpickup.notificationservice.global.client.store;
|
||||
|
||||
import com.justpickup.notificationservice.global.dto.Result;
|
||||
import org.springframework.cloud.openfeign.FeignClient;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
|
||||
@FeignClient("STORE-SERVICE")
|
||||
public interface StoreClient {
|
||||
|
||||
@GetMapping("/store/{storeId}")
|
||||
Result<GetStoreResponse> getStore(@PathVariable(value = "storeId") String storeId);
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
package com.justpickup.notificationservice.global.config;
|
||||
|
||||
import feign.Logger;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class FeignClientConfig {
|
||||
|
||||
@Bean
|
||||
public Logger.Level feignLoggerLevel() {
|
||||
return Logger.Level.FULL;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
package com.justpickup.notificationservice.global.config;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.listener.DefaultErrorHandler;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@EnableKafka
|
||||
@RequiredArgsConstructor
|
||||
@Configuration
|
||||
public class KafkaConsumerConfig {
|
||||
|
||||
@Value("${kafka.host}")
|
||||
private final String host;
|
||||
|
||||
@Value("${kafka.port}")
|
||||
private final String port;
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<String, String> consumerFactory() {
|
||||
Map<String, Object> properties = new HashMap<>();
|
||||
|
||||
String ipAddress = host + ":" + port;
|
||||
|
||||
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ipAddress);
|
||||
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
|
||||
return new DefaultKafkaConsumerFactory<>(properties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
|
||||
= new ConcurrentKafkaListenerContainerFactory<>();
|
||||
|
||||
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
|
||||
kafkaListenerContainerFactory.setCommonErrorHandler(errorHandler());
|
||||
|
||||
return kafkaListenerContainerFactory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DefaultErrorHandler errorHandler() {
|
||||
DefaultErrorHandler handler = new DefaultErrorHandler();
|
||||
// handler.addNotRetryableExceptions();
|
||||
return handler;
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ package com.justpickup.notificationservice.global.entity;
|
||||
import lombok.Getter;
|
||||
import org.hibernate.annotations.CreationTimestamp;
|
||||
import org.hibernate.annotations.UpdateTimestamp;
|
||||
import org.springframework.data.annotation.CreatedBy;
|
||||
|
||||
import javax.persistence.MappedSuperclass;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
package com.justpickup.notificationservice.domain.notification.messagequeue;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
import org.springframework.kafka.test.context.EmbeddedKafka;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
@ExtendWith(SpringExtension.class)
|
||||
@EnableKafka
|
||||
@DirtiesContext
|
||||
@EmbeddedKafka
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
class NotificationConsumerTest {
|
||||
|
||||
private final String ORDER_TOPIC = "orderPlaced";
|
||||
|
||||
private Consumer<Integer, String> consumer;
|
||||
private KafkaTemplate<String, String> producer;
|
||||
|
||||
@Autowired
|
||||
EmbeddedKafkaBroker embeddedKafkaBroker;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
producer = configureProducer();
|
||||
consumer = configureConsumer();
|
||||
}
|
||||
|
||||
private Consumer<Integer, String> configureConsumer() {
|
||||
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
|
||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
Consumer<Integer, String> consumer = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps)
|
||||
.createConsumer();
|
||||
consumer.subscribe(Collections.singleton(ORDER_TOPIC));
|
||||
return consumer;
|
||||
}
|
||||
|
||||
|
||||
private KafkaTemplate<String, String> configureProducer() {
|
||||
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
|
||||
return new KafkaTemplate <>( new DefaultKafkaProducerFactory<>(producerProps));
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("주문 신청")
|
||||
void orderPlaced() throws JsonProcessingException {
|
||||
// GIVEN
|
||||
ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||
NotificationConsumer.KafkaSendOrderDto sendOrderDto
|
||||
= NotificationConsumer.KafkaSendOrderDto.builder()
|
||||
.id(1L)
|
||||
.orderPrice(10_000L)
|
||||
.orderStatus(OrderStatus.PLACED)
|
||||
.orderTime(LocalDateTime.now())
|
||||
.storeId(2L)
|
||||
.build();
|
||||
String sendJson = mapper.writeValueAsString(sendOrderDto);
|
||||
|
||||
// THEN
|
||||
ListenableFuture<SendResult<String, String>> orderPlaced = producer.send(ORDER_TOPIC, sendJson);
|
||||
|
||||
// WHEN
|
||||
ConsumerRecord<Integer, String> singleRecord =
|
||||
KafkaTestUtils.getSingleRecord(consumer, ORDER_TOPIC);
|
||||
|
||||
NotificationConsumer.KafkaSendOrderDto readValue
|
||||
= mapper.readValue(singleRecord.value(), NotificationConsumer.KafkaSendOrderDto.class);
|
||||
|
||||
assertThat(singleRecord).isNotNull();
|
||||
assertThat(readValue.getOrderStatus()).isEqualTo(OrderStatus.PLACED);
|
||||
assertThat(readValue.getId()).isEqualTo(sendOrderDto.getId());
|
||||
}
|
||||
}
|
||||
@@ -62,7 +62,7 @@ class NotificationControllerTest {
|
||||
|
||||
// THEN
|
||||
ResultActions actions
|
||||
= mockMvc.perform(get(url).header("user-id", String.valueOf(userId)));
|
||||
= mockMvc.perform(get("/notifications").header("user-id", String.valueOf(userId)));
|
||||
|
||||
// WHEN
|
||||
actions.andExpect(status().isOk())
|
||||
|
||||
Reference in New Issue
Block a user