This commit is contained in:
lee-soo-heon
2019-10-28 13:51:41 +09:00
12 changed files with 302 additions and 127 deletions

View File

@@ -85,4 +85,7 @@ options:
env:
# # location/name of GKE cluster (used by all kubectl commands)
- CLOUDSDK_COMPUTE_ZONE=asia-northeast1-a
- CLOUDSDK_CONTAINER_CLUSTER=standard-cluster-1
- CLOUDSDK_CONTAINER_CLUSTER=standard-cluster-1

39
pom.xml
View File

@@ -24,10 +24,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-rest</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
@@ -37,17 +33,42 @@
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
<scope>compile</scope>
</dependency>
<!-- kafka streams -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- test -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<!-- tag::stubrunner[] -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-contract-stub-runner</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-contract-verifier</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>

View File

@@ -1,10 +1,13 @@
package com.example.template;
import com.example.template.config.kafka.KafkaProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
@EnableBinding(KafkaProcessor.class)
public class Application {
protected static ApplicationContext applicationContext;

View File

@@ -1,5 +1,6 @@
package com.example.template;
import com.example.template.config.kafka.KafkaProcessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
@@ -10,6 +11,10 @@ import org.springframework.beans.BeanUtils;
import org.springframework.core.env.Environment;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.client.RestTemplate;
import javax.persistence.*;
@@ -35,7 +40,6 @@ public class Order {
*/
@PostPersist
private void publishOrderPlaced() {
KafkaTemplate kafkaTemplate = Application.applicationContext.getBean(KafkaTemplate.class);
RestTemplate restTemplate = Application.applicationContext.getBean(RestTemplate.class);
Environment env = Application.applicationContext.getEnvironment();
@@ -72,9 +76,24 @@ public class Order {
}
// 2. 주문이 발생함 이벤트 발송
String topicName = env.getProperty("eventTopic");
ProducerRecord producerRecord = new ProducerRecord<>(topicName, json);
kafkaTemplate.send(producerRecord);
/**
* spring kafka 방식
*/
// Environment env = Application.applicationContext.getEnvironment();
// String topicName = env.getProperty("eventTopic");
// ProducerRecord producerRecord = new ProducerRecord<>(topicName, json);
// kafkaTemplate.send(producerRecord);
/**
* spring streams 방식
*/
KafkaProcessor processor = Application.applicationContext.getBean(KafkaProcessor.class);
MessageChannel outputChannel = processor.outboundTopic();
outputChannel.send(MessageBuilder
.withPayload(json)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
public Long getProductId() {

View File

@@ -2,7 +2,10 @@ package com.example.template;
import org.springframework.data.repository.PagingAndSortingRepository;
import java.util.List;
public interface OrderRepository extends PagingAndSortingRepository<Order, Long> {
List<Order> findByCustomerId(String customerId);
}

View File

@@ -1,10 +1,12 @@
package com.example.template;
import com.example.template.config.kafka.KafkaProcessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.handler.annotation.Payload;
@@ -25,8 +27,9 @@ public class OrderService {
/**
* 상품 변경이 발생할때마다, 상품정보를 저장해 놓음
*/
@KafkaListener(topics = "${eventTopic}", groupId = "{{policy}}")
public void onDeliveryCompleted(@Payload String message, ConsumerRecord<?, ?> consumerRecord) {
@StreamListener(KafkaProcessor.INPUT)
public void onProductChanged(@Payload String message) {
System.out.println("##### listener : " + message);
ObjectMapper objectMapper = new ObjectMapper();

View File

@@ -0,0 +1,19 @@
package com.example.template.config.kafka;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface KafkaProcessor {
String INPUT = "event-in";
String OUTPUT = "event-out";
@Input(INPUT)
SubscribableChannel inboundTopic();
@Output(OUTPUT)
MessageChannel outboundTopic();
}

View File

@@ -1,51 +0,0 @@
package com.example.template.config.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@EnableKafka
@Configuration
public class KafkaReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
long seed = System.currentTimeMillis();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "orders"+seed);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

View File

@@ -1,41 +0,0 @@
package com.example.template.config.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaSenderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}

View File

@@ -1,31 +1,43 @@
server:
port: 8080
eventTopic: eventTopic
## 주문은 product 정보가 필요하다.
## checkStock 를 true 로 준다면, product 서비스가 죽어있을때는 주문이 안된다.
checkStock: false
---
spring:
profiles: default
kafka:
# bootstrap-servers: http://35.200.47.242:31090
bootstrap-servers: localhost:9092
consumer:
enable-auto-commit: true
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
event-in:
group: orders
destination: eventTopic
contentType: application/json
event-out:
destination: eventTopic
contentType: application/json
jpa:
properties:
hibernate:
show_sql: true
format_sql: true
logging:
level:
org:
hibernate:
type: trace
#logging:
# level:
# org.hibernate.type: trace
# org.springframework.cloud: debug
productUrl: http://localhost:8085
@@ -34,11 +46,26 @@ server:
---
spring:
profiles: docker
kafka:
bootstrap-servers: my-kafka.kafka.svc.cluster.local:9092
consumer:
enable-auto-commit: true
cloud:
stream:
kafka:
binder:
brokers: my-kafka.kafka.svc.cluster.local:9092
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
event-in:
group: orders
destination: eventTopicDocker
contentType: application/json
event-out:
destination: eventTopicDocker
contentType: application/json
productUrl: http://products:8080
eventTopic: eventTopicDocker

View File

@@ -0,0 +1,76 @@
package com.example.template;
import com.example.template.config.kafka.KafkaProcessor;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.contract.stubrunner.StubFinder;
import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import org.springframework.test.context.junit4.SpringRunner;
import javax.management.NotificationListener;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class, webEnvironment = SpringBootTest.WebEnvironment.MOCK)
@AutoConfigureMockMvc
// provide the [group-id]:[artifact-id]
@AutoConfigureStubRunner(stubsMode = StubRunnerProperties.StubsMode.LOCAL, ids = "com.example:boot-camp-products")
//@AutoConfigureStubRunner(
// repositoryRoot="http://34.85.54.161:8081/repository/maven-snapshots/",
// ids = "com.example:boot-camp-products:+:stubs:8090",
// stubsMode = StubRunnerProperties.StubsMode.REMOTE
//)
@AutoConfigureWireMock(port = 0)
public class ProductChangedContactTest {
@Autowired
StubFinder stubFinder;
@Autowired
private KafkaProcessor processor;
@Autowired
private MessageCollector messageCollector;
// @Autowired
// ConsumerTemplate consumerTemplate;
@Test
public void testOnMessageReceived() {
// event start
stubFinder.trigger("productChanged");
Message<String> received = (Message<String>) messageCollector.forChannel(processor.outboundTopic()).poll();
System.out.println("=======================================================");
// System.out.println(message);
System.out.println(received.getPayload());
System.out.println("=======================================================");
DocumentContext parsedJson = JsonPath.parse(received.getPayload());
// 넘어 오는 값에 대하여 validation 을 한다. 만약 productName 컬럼이 변경되었다면 에러가 발생한다.
assertThat(parsedJson.read("$.productId", String.class)).matches("[\\S\\s]+");
assertThat(parsedJson.read("$.productName", String.class)).matches("[\\S\\s]+");
assertThat(parsedJson.read("$.productPrice", String.class)).matches("[\\S\\s]+");
assertThat(parsedJson.read("$.productStock", String.class)).matches("[\\S\\s]+");
assertThat(parsedJson.read("$.imageUrl", String.class)).matches("[\\S\\s]+");
}
}

View File

@@ -0,0 +1,93 @@
package com.example.template;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import io.restassured.module.mockmvc.RestAssuredMockMvc;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.cloud.contract.stubrunner.StubRunning;
import org.springframework.cloud.contract.stubrunner.server.HttpStubsController;
import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
import org.springframework.cloud.contract.stubrunner.spring.StubRunnerPort;
import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
import org.springframework.http.MediaType;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;
import java.util.Collections;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
@AutoConfigureMockMvc
// provide the [group-id]:[artifact-id]
@AutoConfigureStubRunner(stubsMode = StubRunnerProperties.StubsMode.LOCAL, ids = "com.example:boot-camp-products")
//@AutoConfigureStubRunner(
// repositoryRoot="http://34.85.54.161:8081/repository/maven-snapshots/",
// ids = "com.example:boot-camp-products:+:stubs:8090",
// stubsMode = StubRunnerProperties.StubsMode.REMOTE
//)
@AutoConfigureWireMock(port = 0)
public class ProductContractTest {
@StubRunnerPort("boot-camp-products")
int mockPort;
// @Autowired
// MockMvc mockMvc;
@MockBean
RestTemplate restTemplate;
// @Autowired
// WebApplicationContext webApplicationContext;
@Autowired
StubRunning stubRunning;
@Before
public void setup() {
//remove::start[]
RestAssuredMockMvc.standaloneSetup(new HttpStubsController(stubRunning));
// RestAssuredMockMvc.webAppContextSetup(this.webApplicationContext);
//remove::end[]
}
@Test
public void getProduct_stub_test() throws Exception {
TestRestTemplate testRestTemplate = new TestRestTemplate();
testRestTemplate.getRestTemplate().setInterceptors(
Collections.singletonList((request, body, execution) -> {
request.getHeaders()
.add("Content-Type", MediaType.APPLICATION_JSON.toString());
return execution.execute(request, body);
}));
String response = testRestTemplate
.getForObject("http://localhost:" + this.mockPort + "/product/1",
String.class);
System.out.println("=======================================================");
System.out.println(this.mockPort);
System.out.println(response);
System.out.println("=======================================================");
DocumentContext parsedJson = JsonPath.parse(response);
// and:
Assertions.assertThat(parsedJson.read("$.id", String.class)).matches("[\\S\\s]+");
Assertions.assertThat(parsedJson.read("$.name", String.class)).matches("[\\S\\s]+");
Assertions.assertThat(parsedJson.read("$.price", String.class)).matches("[\\S\\s]+");
Assertions.assertThat(parsedJson.read("$.stock", String.class)).matches("[\\S\\s]+");
Assertions.assertThat(parsedJson.read("$.imageUrl", String.class)).matches("[\\S\\s]+");
}
}