This commit is contained in:
kimscott
2019-07-29 10:04:07 +09:00
parent 1515d7f7e3
commit 2d11e2226e
15 changed files with 1052 additions and 0 deletions

View File

@@ -0,0 +1,18 @@
package com.example.template;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
public static ApplicationContext applicationContext;
public static void main(String[] args) {
applicationContext = SpringApplication.run(Application.class, args);
}
}

View File

@@ -0,0 +1,69 @@
package com.example.template;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.BeanUtils;
import org.springframework.kafka.core.KafkaTemplate;
import javax.persistence.*;
@Entity
public class Product {
@Id
@GeneratedValue
private Long id;
String name;
int price;
int stock;
@PostPersist @PostUpdate
private void publishStart() {
KafkaTemplate kafkaTemplate = Application.applicationContext.getBean(KafkaTemplate.class);
ObjectMapper objectMapper = new ObjectMapper();
String json = null;
ProductChanged productChanged = new ProductChanged();
productChanged.setProductId(this.id);
productChanged.setProductName(this.name);
productChanged.setProductPrice(this.price);
productChanged.setProductStock(this.stock);
try {
json = objectMapper.writeValueAsString(productChanged);
} catch (JsonProcessingException e) {
throw new RuntimeException("JSON format exception", e);
}
if( json != null ){
ProducerRecord producerRecord = new ProducerRecord<>("eventTopic", json);
kafkaTemplate.send(producerRecord);
}
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
public int getStock() {
return stock;
}
public void setStock(int stock) {
this.stock = stock;
}
}

View File

@@ -0,0 +1,66 @@
package com.example.template;
public class ProductChanged {
private String type ;
private String stateMessage = "상품 변경이 발생함";
private Long productId;
private String productName;
private int productPrice;
private int productStock;
public ProductChanged(){
this.setType(this.getClass().getSimpleName());
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getStateMessage() {
return stateMessage;
}
public void setStateMessage(String stateMessage) {
this.stateMessage = stateMessage;
}
public Long getProductId() {
return productId;
}
public void setProductId(Long productId) {
this.productId = productId;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public int getProductPrice() {
return productPrice;
}
public void setProductPrice(int productPrice) {
this.productPrice = productPrice;
}
public int getProductStock() {
return productStock;
}
public void setProductStock(int productStock) {
this.productStock = productStock;
}
}

View File

@@ -0,0 +1,11 @@
package com.example.template;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import java.util.List;
public interface ProductRepository extends CrudRepository<Product, Long> {
List<Product> findByName(@Param("name") String name);
}

View File

@@ -0,0 +1,36 @@
package com.example.template;
public class ProductRequired {
private String type ;
private String stateMessage;
private String productName ;
public ProductRequired(){
this.setType(this.getClass().getSimpleName());
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getStateMessage() {
return stateMessage;
}
public void setStateMessage(String stateMessage) {
this.stateMessage = stateMessage;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
}

View File

@@ -0,0 +1,60 @@
package com.example.template;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class ProductService {
@Autowired
ProductRepository productRepository;
@KafkaListener(topics = "eventTopic")
public void onListener(@Payload String message, ConsumerRecord<?, ?> consumerRecord) {
System.out.println("##### listener : " + message);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
ProductRequired productRequired = null;
try {
productRequired = objectMapper.readValue(message, ProductRequired.class);
/**
* 상품 추가 요청이 왔을때 해당 상품을 찾아서 재고를 늘린다.
*/
if( productRequired.getType().equals(ProductRequired.class.getSimpleName())){
List<Product> productList = productRepository.findByName(productRequired.getProductName());
Product product = null;
if( productList != null && productList.size() > 0 ){
product = productList.get(0);
}
if( product == null ) {
product = new Product();
product.setName(productRequired.getProductName());
product.setPrice(10000);
product.setStock(1);
}
// product 의 수량을 10개씩 늘린다
product.setStock(product.getStock() + 10);
productRepository.save(product);
}
}catch (Exception e){
}
}
}

View File

@@ -0,0 +1,50 @@
package com.example.template.config.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@EnableKafka
@Configuration
public class KafkaReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "products");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

View File

@@ -0,0 +1,41 @@
package com.example.template.config.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaSenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}

View File

@@ -0,0 +1,34 @@
server:
port: 8080
eventTopic: eventTopic
---
spring:
profiles: default
kafka:
# bootstrap-servers: http://35.194.123.133:19092
bootstrap-servers: localhost:9092
consumer:
enable-auto-commit: true
jpa:
properties:
hibernate:
show_sql: true
format_sql: true
logging:
level:
org:
hibernate:
type: trace
server:
port: 8085
---
spring:
profiles: docker
kafka:
bootstrap-servers: my-kafka.kafka.svc.cluster.local:9092
consumer:
enable-auto-commit: true