add spring contract

This commit is contained in:
kimscott
2019-10-24 18:14:03 +09:00
parent 974c97d8aa
commit 3a46238ec7
13 changed files with 424 additions and 129 deletions

55
pom.xml
View File

@@ -17,6 +17,8 @@
<properties> <properties>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<spring-cloud.version>Greenwich.RELEASE</spring-cloud.version> <spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
<spring-cloud-stream.version>Germantown.SR1</spring-cloud-stream.version>
<spring-cloud-contract.version>2.1.3.RELEASE</spring-cloud-contract.version>
</properties> </properties>
<dependencies> <dependencies>
@@ -28,10 +30,6 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-rest</artifactId> <artifactId>spring-boot-starter-data-rest</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId> <artifactId>spring-boot-starter-data-jpa</artifactId>
@@ -55,6 +53,30 @@
<version>3.8.1</version> <version>3.8.1</version>
</dependency> </dependency>
<!-- kafka streams -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- test -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-contract-verifier</artifactId>
<version>${spring-cloud-contract.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<dependencyManagement> <dependencyManagement>
@@ -66,6 +88,13 @@
<type>pom</type> <type>pom</type>
<scope>import</scope> <scope>import</scope>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>${spring-cloud-stream.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
@@ -75,6 +104,24 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId> <artifactId>spring-boot-maven-plugin</artifactId>
</plugin> </plugin>
<!-- SPRING CLOUD CONTRACT -->
<plugin>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-contract-maven-plugin</artifactId>
<version>${spring-cloud-contract.version}</version>
<extensions>true</extensions>
<configuration>
<packageWithBaseClasses>com.example.template</packageWithBaseClasses>
<baseClassMappings>
<baseClassMapping>
<contractPackageRegex>.*com.*</contractPackageRegex>
<baseClassFQN>com.example.template.RestBase</baseClassFQN>
</baseClassMapping>
</baseClassMappings>
<testMode>EXPLICIT</testMode>
</configuration>
</plugin>
<!-- SPRING CLOUD CONTRACT -->
</plugins> </plugins>
</build> </build>

View File

@@ -1,14 +1,14 @@
package com.example.template; package com.example.template;
import org.springframework.beans.factory.annotation.Autowired; import com.example.template.config.kafka.KafkaProcessor;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
import javax.annotation.PostConstruct;
@SpringBootApplication @SpringBootApplication
@EnableBinding(KafkaProcessor.class)
public class Application { public class Application {
protected static ApplicationContext applicationContext; protected static ApplicationContext applicationContext;
@@ -27,7 +27,6 @@ public class Application {
product.setPrice(i*10000); product.setPrice(i*10000);
product.setStock(i*10); product.setStock(i*10);
product.setImageUrl("/goods/img/"+p+".jpg"); product.setImageUrl("/goods/img/"+p+".jpg");
// product.setImageUrl("https://github.githubassets.com/images/modules/profile/profile-joined-github.png");
i++; i++;
productRepository.save(product); productRepository.save(product);
} }

View File

@@ -1,11 +1,17 @@
package com.example.template; package com.example.template;
import com.example.template.config.kafka.KafkaProcessor;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import javax.persistence.*; import javax.persistence.*;
@@ -23,8 +29,6 @@ public class Product {
@PostPersist @PostUpdate @PostPersist @PostUpdate
private void publishStart() { private void publishStart() {
KafkaTemplate kafkaTemplate = Application.applicationContext.getBean(KafkaTemplate.class);
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
String json = null; String json = null;
@@ -41,10 +45,25 @@ public class Product {
} }
if( json != null ){ if( json != null ){
Environment env = Application.applicationContext.getEnvironment(); /**
String topicName = env.getProperty("eventTopic"); * spring kafka 방식
ProducerRecord producerRecord = new ProducerRecord<>(topicName, json); */
kafkaTemplate.send(producerRecord); // Environment env = Application.applicationContext.getEnvironment();
// String topicName = env.getProperty("eventTopic");
// ProducerRecord producerRecord = new ProducerRecord<>(topicName, json);
// kafkaTemplate.send(producerRecord);
/**
* spring streams 방식
*/
KafkaProcessor processor = Application.applicationContext.getBean(KafkaProcessor.class);
MessageChannel outputChannel = processor.outboundTopic();
outputChannel.send(MessageBuilder
.withPayload(json)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
} }
} }

View File

@@ -0,0 +1,18 @@
package com.example.template;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProductController {
@Autowired
ProductService productService;
@GetMapping("/product/{productId}")
Product productStockCheck(@PathVariable(value = "productId") Long productId) {
return this.productService.getProductById(productId);
}
}

View File

@@ -1,26 +1,23 @@
package com.example.template; package com.example.template;
import com.example.template.config.kafka.KafkaProcessor;
import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Optional; import java.util.Optional;
@Service @Service
public class ProductService { public class ProductService {
@Autowired @Autowired
ProductRepository productRepository; ProductRepository productRepository;
@KafkaListener(topics = "${eventTopic}") @StreamListener(KafkaProcessor.INPUT)
public void onOrderPlaced(@Payload String message, ConsumerRecord<?, ?> consumerRecord) { public void onOrderPlaced(@Payload String message) {
System.out.println("##### listener : " + message); System.out.println("##### listener : " + message);
ObjectMapper objectMapper = new ObjectMapper(); ObjectMapper objectMapper = new ObjectMapper();
@@ -47,4 +44,44 @@ public class ProductService {
} }
} }
// @KafkaListener(topics = "${eventTopic}")
// public void onOrderPlaced(@Payload String message, ConsumerRecord<?, ?> consumerRecord) {
// System.out.println("##### listener : " + message);
//
// ObjectMapper objectMapper = new ObjectMapper();
// objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
//
// OrderPlaced orderPlaced = null;
// try {
// orderPlaced = objectMapper.readValue(message, OrderPlaced.class);
//
// /**
// * 주문이 발생시, 수량을 줄인다.
// */
// if( orderPlaced.getEventType().equals(OrderPlaced.class.getSimpleName())){
//
// Optional<Product> productOptional = productRepository.findById(orderPlaced.getProductId());
// Product product = productOptional.get();
// product.setStock(product.getStock() - orderPlaced.getQuantity());
//
// productRepository.save(product);
//
// }
//
// }catch (Exception e){
//
// }
// }
/**
* 상품 조회
*/
public Product getProductById(Long id){
Optional<Product> productOptional = productRepository.findById(id);
Product product = productOptional.get();
return product;
}
} }

View File

@@ -0,0 +1,19 @@
package com.example.template.config.kafka;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface KafkaProcessor {
String INPUT = "event-in";
String OUTPUT = "event-out";
@Input(INPUT)
SubscribableChannel inboundTopic();
@Output(OUTPUT)
MessageChannel outboundTopic();
}

View File

@@ -1,51 +0,0 @@
package com.example.template.config.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@EnableKafka
@Configuration
public class KafkaReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
long seed = System.currentTimeMillis();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "products"+seed);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

View File

@@ -1,41 +0,0 @@
package com.example.template.config.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaSenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}

View File

@@ -2,34 +2,63 @@ server:
port: 8080 port: 8080
eventTopic: eventTopic
--- ---
spring: spring:
profiles: default profiles: default
kafka:
# bootstrap-servers: http://35.200.47.242:31090
bootstrap-servers: localhost:9092
consumer:
enable-auto-commit: true
jpa: jpa:
properties: properties:
hibernate: hibernate:
show_sql: true show_sql: true
format_sql: true format_sql: true
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
event-in:
group: products
destination: eventTopic
contentType: application/json
event-out:
destination: eventTopic
contentType: application/json
logging: logging:
level: level:
org: org.hibernate.type: trace
hibernate: org.springframework.cloud: debug
type: trace
server: server:
port: 8085 port: 8085
--- ---
spring: spring:
profiles: docker profiles: docker
kafka: cloud:
bootstrap-servers: my-kafka.kafka.svc.cluster.local:9092 stream:
consumer: kafka:
enable-auto-commit: true binder:
eventTopic: eventTopicDocker brokers: my-kafka.kafka.svc.cluster.local:9092
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
event-in:
group: products
destination: eventTopicDocker
contentType: application/json
event-out:
destination: eventTopicDocker
contentType: application/json

View File

@@ -0,0 +1,66 @@
package com.example.template;
import com.example.template.config.kafka.KafkaProcessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.contract.verifier.messaging.MessageVerifier;
import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.MimeTypeUtils;
import java.util.concurrent.TimeUnit;
@RunWith(SpringRunner.class) // junit4 와 springboot 를 연결해준다.
// 통합테스트
@SpringBootTest(classes = Application.class, webEnvironment = SpringBootTest.WebEnvironment.NONE)
@AutoConfigureMessageVerifier
public abstract class MessagingBase {
//remove::start[]
@Autowired
MessageVerifier messaging;
//remove::end[]
@Autowired
KafkaProcessor kafkaProcessor;
@Before
public void setup() {
// let's clear any remaining messages
// output == destination or channel name
//remove::start[]
this.messaging.receive(kafkaProcessor.outboundTopic().toString(), 100, TimeUnit.MILLISECONDS);
//remove::end[]
}
public void productChanged() {
ObjectMapper objectMapper = new ObjectMapper();
String json = null;
ProductChanged productChanged = new ProductChanged();
productChanged.setProductId(1L);
productChanged.setProductName("TEST");
productChanged.setProductPrice(10000);
productChanged.setProductStock(10);
productChanged.setImageUrl("/test.jpg");
try {
json = objectMapper.writeValueAsString(productChanged);
} catch (JsonProcessingException e) {
throw new RuntimeException("JSON format exception", e);
}
System.out.println("test output Topic = " + kafkaProcessor.outboundTopic().toString());
this.messaging.send(MessageBuilder
.withPayload(json)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build(), kafkaProcessor.outboundTopic().toString());
}
}

View File

@@ -0,0 +1,70 @@
package com.example.template;
import io.restassured.RestAssured;
import io.restassured.module.mockmvc.RestAssuredMockMvc;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier;
//import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.test.web.servlet.setup.StandaloneMockMvcBuilder;
import org.springframework.web.context.WebApplicationContext;
import java.util.Optional;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
@RunWith(SpringRunner.class) // junit4 와 springboot 를 연결해준다.
// 통합테스트
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
//@EmbeddedKafka // 테스트시 kafka 를 테스트용으로 돌리기 위하여 필요
// 테스트를 여러번 수행하면 applicationContext 가 변경될수 있는데,
// 기존것을 폐기하고 새로운 applicationContext를 생성하여 컨텍스트를 공유하지 않도록 설정함
// DB test 나 kafka 테스트시 설정을 해줘야함
@DirtiesContext
// test 를 할때 test resource 에 application.yaml 을 만드는 방법도 있지만, 하나의 파일에서 관리하고 싶거나,
// 특정 test 시 특정 Profiles 을 load 하고 싶지 않을때 설정하여 사용한다.
//@ActiveProfiles("test")
//@AutoConfigureMessageVerifier
public class RestBase {
@MockBean
ProductRepository productRepository;
@Autowired
WebApplicationContext webApplicationContext;
@LocalServerPort
int port;
@Before
public void setup() {
//remove::start[]
RestAssuredMockMvc.webAppContextSetup(this.webApplicationContext);
RestAssured.baseURI = "http://localhost";
RestAssured.port = this.port;
Product product = new Product();
product.setId(1L);
product.setImageUrl("https://github.githubassets.com/images/modules/profile/profile-joined-github.png");
product.setName("TV");
product.setPrice(10000);
product.setStock(10);
given(this.productRepository.findById(any())).willReturn(Optional.of(product));
Mockito.when(this.productRepository.findById(any())).thenReturn(Optional.of(product));
}
}

View File

@@ -0,0 +1,52 @@
package contracts.messaging
org.springframework.cloud.contract.spec.Contract.make {
description("""
spring contract 에서 메세지를 받는 방식은 총 3가지인데,
1. input 은 없고 output만 있는 경우
2. input 을 받아서 output 으로 보내는 경우
3. input 만 있는 경우
input 에 triggeredBy method 를 호출하는 경우는 보통 input 메세지가 없는 경우이다.
```
given:
product changed event occurred
when:
he applies for a beer
then:
we'll send a message with a ProductChanged message
```
""")
// Label by means of which the output message can be triggered
label 'productChanged'
// input to the contract
input {
// the contract will be triggered by a method
triggeredBy('productChanged()')
}
// output message of the contract
outputMessage {
// destination to which the output message will be sent
sentTo 'eventTopic'
// the body of the output message
body(
productId: 1,
productName: "TV",
productPrice: 10000,
productStock: 10,
imageUrl: "testUrl"
)
bodyMatchers {
jsonPath('$.productId', byRegex(nonEmpty()).asLong())
jsonPath('$.productName', byRegex(nonEmpty()).asString())
jsonPath('$.productPrice', byRegex(nonEmpty()).asLong())
jsonPath('$.productStock', byRegex(nonEmpty()).asLong())
jsonPath('$.imageUrl', byRegex(nonEmpty()).asString())
}
headers {
messagingContentType(applicationJson())
}
}
}

View File

@@ -0,0 +1,31 @@
package contracts.rest
org.springframework.cloud.contract.spec.Contract.make {
request {
method 'GET'
url ('/product/1')
headers {
contentType(applicationJson())
}
}
response {
status 200
body(
id: 1,
name: "TV",
price: 10000,
stock: 10,
imageUrl: "testUrl"
)
bodyMatchers {
jsonPath('$.id', byRegex(nonEmpty()).asLong())
jsonPath('$.name', byRegex(nonEmpty()).asString())
jsonPath('$.price', byRegex(nonEmpty()).asLong())
jsonPath('$.stock', byRegex(nonEmpty()).asLong())
jsonPath('$.imageUrl', byRegex(nonEmpty()).asString())
}
headers {
contentType(applicationJson())
}
}
}