test(notification): orderPlaced Kafka Consumer 테스트 작성
This commit is contained in:
@@ -3,7 +3,9 @@ package com.justpickup.notificationservice.domain.notification.messagequeue;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.justpickup.notificationservice.domain.notification.service.NotificationService;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
@@ -21,17 +23,17 @@ public class NotificationConsumer {
|
||||
private final ObjectMapper objectMapper;
|
||||
private final NotificationService notificationService;
|
||||
|
||||
@KafkaListener(topics = "orderApply")
|
||||
public void applyOrder(String kafkaMessage) throws JsonProcessingException {
|
||||
log.debug("## NotificationConsumer.applyOrder");
|
||||
@KafkaListener(topics = "orderPlaced")
|
||||
public void orderPlaced(String kafkaMessage) throws JsonProcessingException {
|
||||
log.debug("## NotificationConsumer.orderPlaced");
|
||||
log.debug("#### kafka Message = {}", kafkaMessage);
|
||||
|
||||
KafkaSendOrderDto kafkaSendOrderDto = objectMapper.readValue(kafkaMessage, KafkaSendOrderDto.class);
|
||||
|
||||
|
||||
notificationService.insertOrderPlaced(kafkaSendOrderDto.getUserId(), kafkaSendOrderDto.getStoreId());
|
||||
}
|
||||
|
||||
@Data
|
||||
@Data @NoArgsConstructor
|
||||
static class KafkaSendOrderDto {
|
||||
private Long id;
|
||||
private Long userId;
|
||||
@@ -41,5 +43,18 @@ public class NotificationConsumer {
|
||||
private LocalDateTime orderTime;
|
||||
private long usedPoint;
|
||||
private OrderStatus orderStatus;
|
||||
|
||||
@Builder
|
||||
public KafkaSendOrderDto(Long id, Long userId, Long userCouponId, Long storeId,
|
||||
long orderPrice, LocalDateTime orderTime, long usedPoint, OrderStatus orderStatus) {
|
||||
this.id = id;
|
||||
this.userId = userId;
|
||||
this.userCouponId = userCouponId;
|
||||
this.storeId = storeId;
|
||||
this.orderPrice = orderPrice;
|
||||
this.orderTime = orderTime;
|
||||
this.usedPoint = usedPoint;
|
||||
this.orderStatus = orderStatus;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
package com.justpickup.notificationservice.domain.notification.messagequeue;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestInstance;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
import org.springframework.kafka.test.context.EmbeddedKafka;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
@ExtendWith(SpringExtension.class)
|
||||
@EnableKafka
|
||||
@DirtiesContext
|
||||
@EmbeddedKafka
|
||||
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
|
||||
class NotificationConsumerTest {
|
||||
|
||||
private final String ORDER_TOPIC = "orderPlaced";
|
||||
|
||||
private Consumer<Integer, String> consumer;
|
||||
private KafkaTemplate<String, String> producer;
|
||||
|
||||
@Autowired
|
||||
EmbeddedKafkaBroker embeddedKafkaBroker;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
producer = configureProducer();
|
||||
consumer = configureConsumer();
|
||||
}
|
||||
|
||||
private Consumer<Integer, String> configureConsumer() {
|
||||
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
|
||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
Consumer<Integer, String> consumer = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps)
|
||||
.createConsumer();
|
||||
consumer.subscribe(Collections.singleton(ORDER_TOPIC));
|
||||
return consumer;
|
||||
}
|
||||
|
||||
|
||||
private KafkaTemplate<String, String> configureProducer() {
|
||||
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
|
||||
return new KafkaTemplate <>( new DefaultKafkaProducerFactory<>(producerProps));
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("주문 신청")
|
||||
void orderPlaced() throws JsonProcessingException {
|
||||
// GIVEN
|
||||
ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||
NotificationConsumer.KafkaSendOrderDto sendOrderDto
|
||||
= NotificationConsumer.KafkaSendOrderDto.builder()
|
||||
.id(1L)
|
||||
.orderPrice(10_000L)
|
||||
.orderStatus(OrderStatus.PLACED)
|
||||
.orderTime(LocalDateTime.now())
|
||||
.storeId(2L)
|
||||
.build();
|
||||
String sendJson = mapper.writeValueAsString(sendOrderDto);
|
||||
|
||||
// THEN
|
||||
ListenableFuture<SendResult<String, String>> orderPlaced = producer.send(ORDER_TOPIC, sendJson);
|
||||
|
||||
// WHEN
|
||||
ConsumerRecord<Integer, String> singleRecord =
|
||||
KafkaTestUtils.getSingleRecord(consumer, ORDER_TOPIC);
|
||||
|
||||
NotificationConsumer.KafkaSendOrderDto readValue
|
||||
= mapper.readValue(singleRecord.value(), NotificationConsumer.KafkaSendOrderDto.class);
|
||||
|
||||
assertThat(singleRecord).isNotNull();
|
||||
assertThat(readValue.getOrderStatus()).isEqualTo(OrderStatus.PLACED);
|
||||
assertThat(readValue.getId()).isEqualTo(sendOrderDto.getId());
|
||||
}
|
||||
}
|
||||
@@ -62,7 +62,7 @@ class NotificationControllerTest {
|
||||
|
||||
// THEN
|
||||
ResultActions actions
|
||||
= mockMvc.perform(get(url).header("user-id", String.valueOf(userId)));
|
||||
= mockMvc.perform(get("/notifications").header("user-id", String.valueOf(userId)));
|
||||
|
||||
// WHEN
|
||||
actions.andExpect(status().isOk())
|
||||
|
||||
Reference in New Issue
Block a user