#25 order-service: order kafka producer - kafka connect

This commit is contained in:
haerong22
2022-12-21 21:01:52 +09:00
parent f55619ac15
commit 50af3889ef
6 changed files with 136 additions and 4 deletions

View File

@@ -3,6 +3,7 @@ package com.example.orderservice.controller;
import com.example.orderservice.dto.OrderDto;
import com.example.orderservice.entity.OrderEntity;
import com.example.orderservice.messagequeue.KafkaProducer;
import com.example.orderservice.messagequeue.OrderProducer;
import com.example.orderservice.service.OrderService;
import com.example.orderservice.vo.RequestOrder;
import com.example.orderservice.vo.ResponseOrder;
@@ -17,6 +18,7 @@ import org.springframework.web.bind.annotation.*;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@RestController
@RequestMapping("/order-service")
@@ -26,6 +28,7 @@ public class OrderController {
private final Environment env;
private final OrderService orderService;
private final KafkaProducer kafkaProducer;
private final OrderProducer orderProducer;
private final ModelMapper mapper = new ModelMapper();
@PostConstruct
@@ -38,15 +41,22 @@ public class OrderController {
@RequestBody RequestOrder requestOrder) {
// jpa
OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createdOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
// jpa
// OrderDto createdOrder = orderService.createOrder(orderDto);
// ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
// kafka
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(requestOrder.getQty() * requestOrder.getUnitPrice());
// send this order to the kafka
kafkaProducer.send("example-catalog-topic", createdOrder);
kafkaProducer.send("example-catalog-topic", orderDto);
orderProducer.send("orders", orderDto);
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}

View File

@@ -0,0 +1,12 @@
package com.example.orderservice.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}

View File

@@ -0,0 +1,14 @@
package com.example.orderservice.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}

View File

@@ -0,0 +1,16 @@
package com.example.orderservice.dto;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int qty;
private int unit_price;
private int total_price;
}

View File

@@ -0,0 +1,16 @@
package com.example.orderservice.dto;
import lombok.Builder;
import lombok.Data;
import java.util.List;
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}

View File

@@ -0,0 +1,64 @@
package com.example.orderservice.messagequeue;
import com.example.orderservice.dto.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.List;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final List<Field> fields = Arrays.asList(
new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price")
);
private final Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
public OrderDto send(String topic, OrderDto orderDto) {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
String jsonString = "";
try {
jsonString = objectMapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send(topic, jsonString);
log.info("Order Producer sent data from the order service: " + kafkaOrderDto);
return orderDto;
}
}