feat(order, notification, kafka): 주문 상태 변경 시 kafka 메시지 발송
This commit is contained in:
@@ -34,6 +34,17 @@ public class NotificationConsumer {
|
||||
notificationService.insertOrderPlaced(kafkaSendOrderDto.getUserId(), kafkaSendOrderDto.getStoreId());
|
||||
}
|
||||
|
||||
@Transactional
|
||||
@KafkaListener(topics = "orderAccepted")
|
||||
public void orderAccepted(String kafkaMessage) throws JsonProcessingException {
|
||||
log.debug("## NotificationConsumer.orderAccepted");
|
||||
log.debug("#### kafka Message = {}", kafkaMessage);
|
||||
|
||||
KafkaSendOrderDto orderDto = objectMapper.readValue(kafkaMessage, KafkaSendOrderDto.class);
|
||||
|
||||
notificationService.insertOrderAccepted(orderDto.userId, orderDto.storeId);
|
||||
}
|
||||
|
||||
@Data @NoArgsConstructor
|
||||
static class KafkaSendOrderDto {
|
||||
private Long id;
|
||||
|
||||
@@ -11,4 +11,5 @@ public interface NotificationService {
|
||||
void updateNotification(UpdateNotificationDto dto);
|
||||
Long findNotificationCounts(Long userId, Yn readYn);
|
||||
void insertOrderPlaced(Long userId, Long storeId);
|
||||
void insertOrderAccepted(Long userId, Long storeId);
|
||||
}
|
||||
|
||||
@@ -59,9 +59,22 @@ public class NotificationServiceImpl implements NotificationService {
|
||||
GetStoreResponse storeResponse = storeClient.getStore(String.valueOf(storeId)).getData();
|
||||
|
||||
String title = "주문이 신청되었어요.";
|
||||
String storeName = "[" + storeResponse.getName() + "]";
|
||||
String storeName = "[" + storeResponse.getName() + "] ";
|
||||
String message = storeName + "매장의 주문이 신청되었습니다.";
|
||||
Notification notification = Notification.of(userId, message, title);
|
||||
notificationRepository.save(notification);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
@Override
|
||||
public void insertOrderAccepted(Long userId, Long storeId) {
|
||||
GetStoreResponse storeInfo = storeClient.getStore(String.valueOf(storeId)).getData();
|
||||
|
||||
String title = "주문이 수락되었어요.";
|
||||
String storeName = "[" + storeInfo.getName() + "] ";
|
||||
String message = storeName + "매장의 주문이 수락되었습니다.";
|
||||
Notification notification = Notification.of(userId, message, title);
|
||||
notificationRepository.save(notification);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -15,21 +15,24 @@ public class OrderListener {
|
||||
@Lazy
|
||||
private OrderSender orderSender;
|
||||
|
||||
|
||||
// TODO: 2022/03/15 exception 발생시 order fail 처리
|
||||
@PostUpdate
|
||||
public void postUpdate(Order order){
|
||||
OrderStatus orderStatus = order.getOrderStatus();
|
||||
log.info("[OrderListener] {}", OrderStatus.PLACED.name());
|
||||
if (orderStatus == OrderStatus.PLACED) {
|
||||
log.info("[OrderListener] {}", OrderStatus.PLACED.name());
|
||||
try{
|
||||
orderSender.orderPlaced(OrderSender.KafkaSendOrderDto.createPrimitiveField(order));
|
||||
orderSender.orderPlaced(order);
|
||||
}catch (Exception ex){
|
||||
throw new OrderException(ex.getMessage());
|
||||
}
|
||||
|
||||
} else if (orderStatus == OrderStatus.ACCEPTED) {
|
||||
log.info("[OrderListener] {}", OrderStatus.ACCEPTED.name());
|
||||
try {
|
||||
orderSender.orderAccepted(order);
|
||||
} catch (Exception ex) {
|
||||
throw new OrderException(ex.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ public class OrderRepositoryCustom {
|
||||
|
||||
List<Order> orders = queryFactory
|
||||
.selectFrom(order)
|
||||
.leftJoin(order.transaction)
|
||||
.leftJoin(order.transaction).fetchJoin()
|
||||
.where(
|
||||
orderIdLt(condition.getLastOrderId()),
|
||||
order.orderTime.between(start, end),
|
||||
|
||||
@@ -19,19 +19,31 @@ import java.util.List;
|
||||
public class OrderSender {
|
||||
|
||||
private final KafkaTemplate<String, String> kafkaTemplate;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public void orderPlaced( KafkaSendOrderDto kafkaSendOrderDto) throws Exception{
|
||||
ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||
String jsonInString = mapper.writeValueAsString(kafkaSendOrderDto);
|
||||
public void orderPlaced(Order order) throws Exception{
|
||||
objectMapper.registerModule(new JavaTimeModule());
|
||||
|
||||
KafkaSendOrderDto kafkaSendOrderDto = KafkaSendOrderDto.createPrimitiveField(order);
|
||||
String jsonInString = objectMapper.writeValueAsString(kafkaSendOrderDto);
|
||||
kafkaTemplate.send("orderPlaced", jsonInString);
|
||||
log.info("kafka Producer sent data from the Order microservice: "+ kafkaSendOrderDto);
|
||||
}
|
||||
|
||||
public void orderAccepted(Order order) throws Exception {
|
||||
objectMapper.registerModule(new JavaTimeModule());
|
||||
|
||||
KafkaSendOrderDto kafkaSendOrderDto = KafkaSendOrderDto.createPrimitiveField(order);
|
||||
String json = objectMapper.writeValueAsString(kafkaSendOrderDto);
|
||||
kafkaTemplate.send("orderAccepted", json);
|
||||
log.info("[OrderSender] orderAccepted = {}", json);
|
||||
}
|
||||
|
||||
@NoArgsConstructor
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public static class KafkaSendOrderDto{
|
||||
public static class KafkaSendOrderDto {
|
||||
private Long id;
|
||||
|
||||
private Long userId;
|
||||
|
||||
@@ -35,6 +35,7 @@ public interface StoreClient {
|
||||
Result<List<GetItemResponse>> getItemAndItemOptions(@PathVariable(value = "itemId") Iterable<Long> itemIds);
|
||||
|
||||
default Map<Long, String> getStoreNameMap(Set<Long> storeIds) {
|
||||
if (!storeIds.iterator().hasNext()) return null;
|
||||
List<GetStoreResponse> storeResponses = this.getStoreAllById(storeIds).getData();
|
||||
return storeResponses.stream()
|
||||
.collect(
|
||||
@@ -43,6 +44,7 @@ public interface StoreClient {
|
||||
}
|
||||
|
||||
default Map<Long, String> getItemNameMap(Iterable<Long> itemIds) {
|
||||
if (!itemIds.iterator().hasNext()) return null;
|
||||
List<GetItemsResponse> itemResponses = this.getItems(itemIds).getData();
|
||||
return itemResponses.stream()
|
||||
.collect(
|
||||
@@ -51,6 +53,7 @@ public interface StoreClient {
|
||||
}
|
||||
|
||||
default Map<Long, GetItemResponse> getItemAndItemOptionMap(Iterable<Long> itemIds) {
|
||||
if (!itemIds.iterator().hasNext()) return null;
|
||||
List<GetItemResponse> responses = this.getItemAndItemOptions(itemIds).getData();
|
||||
return responses.stream()
|
||||
.collect(
|
||||
|
||||
@@ -20,6 +20,7 @@ public interface UserClient {
|
||||
Result<List<GetCustomerResponse>> getCustomers(@PathVariable("userIds") Iterable<Long> userIds);
|
||||
|
||||
default Map<Long, String> getUserNameMap(Iterable<Long> userIds) {
|
||||
if (!userIds.iterator().hasNext()) return null;
|
||||
List<GetCustomerResponse> userResponses = this.getCustomers(userIds).getData();
|
||||
return userResponses.stream()
|
||||
.collect(
|
||||
|
||||
@@ -101,6 +101,8 @@ class OrderControllerTest {
|
||||
fieldWithPath("code").description("결과코드 SUCCESS/ERROR"),
|
||||
fieldWithPath("message").description("메세지"),
|
||||
fieldWithPath("data.id").description("주문 고유번호"),
|
||||
fieldWithPath("data.storeName").description("주문 매장이름"),
|
||||
fieldWithPath("data.orderStatus").description("주문 상태"),
|
||||
fieldWithPath("data.orderTime").description("주문 시간 [yyy-MM-dd]"),
|
||||
fieldWithPath("data.orderPrice").description("주문 금액"),
|
||||
fieldWithPath("data.user.id").description("주문한 회원 고유번호"),
|
||||
@@ -168,7 +170,9 @@ class OrderControllerTest {
|
||||
|
||||
return OrderDetailDto.builder()
|
||||
.id(orderId)
|
||||
.orderStatus(OrderStatus.PLACED)
|
||||
.orderTime(LocalDateTime.now())
|
||||
.storeName("매장이름")
|
||||
.orderPrice(76600L)
|
||||
.user(orderDetailUser)
|
||||
.orderItems(List.of(카페라떼, 아메리카노))
|
||||
|
||||
Reference in New Issue
Block a user