#25 order-service: kafka producer
This commit is contained in:
@@ -27,6 +27,8 @@ dependencies {
|
||||
implementation 'org.springframework.boot:spring-boot-starter-web'
|
||||
implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client'
|
||||
|
||||
implementation 'org.springframework.kafka:spring-kafka'
|
||||
|
||||
implementation 'org.mariadb.jdbc:mariadb-java-client'
|
||||
|
||||
compileOnly 'org.projectlombok:lombok'
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.example.orderservice.controller;
|
||||
|
||||
import com.example.orderservice.dto.OrderDto;
|
||||
import com.example.orderservice.entity.OrderEntity;
|
||||
import com.example.orderservice.messagequeue.KafkaProducer;
|
||||
import com.example.orderservice.service.OrderService;
|
||||
import com.example.orderservice.vo.RequestOrder;
|
||||
import com.example.orderservice.vo.ResponseOrder;
|
||||
@@ -24,22 +25,29 @@ public class OrderController {
|
||||
|
||||
private final Environment env;
|
||||
private final OrderService orderService;
|
||||
private final KafkaProducer kafkaProducer;
|
||||
private final ModelMapper mapper = new ModelMapper();
|
||||
|
||||
@PostConstruct
|
||||
public void setMapper() {
|
||||
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
|
||||
}
|
||||
|
||||
@PostMapping("/{userId}/orders")
|
||||
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId,
|
||||
@RequestBody RequestOrder requestOrder) {
|
||||
|
||||
|
||||
// jpa
|
||||
OrderDto orderDto = mapper.map(requestOrder, OrderDto.class);
|
||||
orderDto.setUserId(userId);
|
||||
OrderDto createdOrder = orderService.createOrder(orderDto);
|
||||
|
||||
ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
|
||||
|
||||
// send this order to the kafka
|
||||
kafkaProducer.send("example-catalog-topic", createdOrder);
|
||||
|
||||
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.example.orderservice.messagequeue;
|
||||
|
||||
import com.example.orderservice.dto.OrderDto;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class KafkaProducer {
|
||||
|
||||
private final KafkaTemplate<String, String> kafkaTemplate;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
public OrderDto send(String topic, OrderDto orderDto) {
|
||||
|
||||
String jsonString = "";
|
||||
|
||||
try {
|
||||
jsonString = objectMapper.writeValueAsString(orderDto);
|
||||
} catch (JsonProcessingException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
kafkaTemplate.send(topic, jsonString);
|
||||
|
||||
log.info("Kafka Producer sent data from the order service: " + orderDto);
|
||||
|
||||
return orderDto;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.example.orderservice.messagequeue;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@EnableKafka
|
||||
@Configuration
|
||||
public class KafkaProducerConfig {
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, String> producerFactory() {
|
||||
Map<String, Object> properties = new HashMap<>();
|
||||
|
||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
|
||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
|
||||
return new DefaultKafkaProducerFactory<>(properties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, String> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
}
|
||||
@@ -23,6 +23,7 @@ spring:
|
||||
|
||||
eureka:
|
||||
instance:
|
||||
hostname: localhost
|
||||
instance-id: ${spring.application.name}:${spring.application.instance_id:${random.value}}
|
||||
client:
|
||||
register-with-eureka: true
|
||||
|
||||
Reference in New Issue
Block a user