diff --git a/cloudbuild.yaml b/cloudbuild.yaml
index d6b6917..ad13d95 100644
--- a/cloudbuild.yaml
+++ b/cloudbuild.yaml
@@ -85,4 +85,7 @@ options:
env:
# # location/name of GKE cluster (used by all kubectl commands)
- CLOUDSDK_COMPUTE_ZONE=asia-northeast1-a
- - CLOUDSDK_CONTAINER_CLUSTER=standard-cluster-1
\ No newline at end of file
+ - CLOUDSDK_CONTAINER_CLUSTER=standard-cluster-1
+
+
+
diff --git a/pom.xml b/pom.xml
index b3b37bf..faf23cc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,10 +24,6 @@
org.springframework.boot
spring-boot-starter-data-rest
-
- org.springframework.kafka
- spring-kafka
-
org.springframework.boot
spring-boot-starter-data-jpa
@@ -37,17 +33,42 @@
h2
runtime
-
- org.springframework.kafka
- spring-kafka
-
-
+
com.google.code.gson
gson
2.8.5
compile
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-stream-kafka
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-stream-test-support
+ test
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-contract-stub-runner
+ test
+
+
+ org.springframework.cloud
+ spring-cloud-starter-contract-verifier
+ test
+
diff --git a/src/main/java/com/example/template/Application.java b/src/main/java/com/example/template/Application.java
index 7830f6d..7c8f0e9 100644
--- a/src/main/java/com/example/template/Application.java
+++ b/src/main/java/com/example/template/Application.java
@@ -1,10 +1,13 @@
package com.example.template;
+import com.example.template.config.kafka.KafkaProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
+@EnableBinding(KafkaProcessor.class)
public class Application {
protected static ApplicationContext applicationContext;
diff --git a/src/main/java/com/example/template/Order.java b/src/main/java/com/example/template/Order.java
index 764044e..7af31d2 100644
--- a/src/main/java/com/example/template/Order.java
+++ b/src/main/java/com/example/template/Order.java
@@ -1,5 +1,6 @@
package com.example.template;
+import com.example.template.config.kafka.KafkaProcessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
@@ -10,6 +11,10 @@ import org.springframework.beans.BeanUtils;
import org.springframework.core.env.Environment;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.util.MimeTypeUtils;
import org.springframework.web.client.RestTemplate;
import javax.persistence.*;
@@ -35,7 +40,6 @@ public class Order {
*/
@PostPersist
private void publishOrderPlaced() {
- KafkaTemplate kafkaTemplate = Application.applicationContext.getBean(KafkaTemplate.class);
RestTemplate restTemplate = Application.applicationContext.getBean(RestTemplate.class);
Environment env = Application.applicationContext.getEnvironment();
@@ -72,9 +76,24 @@ public class Order {
}
// 2. 주문이 발생함 이벤트 발송
- String topicName = env.getProperty("eventTopic");
- ProducerRecord producerRecord = new ProducerRecord<>(topicName, json);
- kafkaTemplate.send(producerRecord);
+ /**
+ * spring kafka 방식
+ */
+// Environment env = Application.applicationContext.getEnvironment();
+// String topicName = env.getProperty("eventTopic");
+// ProducerRecord producerRecord = new ProducerRecord<>(topicName, json);
+// kafkaTemplate.send(producerRecord);
+
+ /**
+ * spring streams 방식
+ */
+ KafkaProcessor processor = Application.applicationContext.getBean(KafkaProcessor.class);
+ MessageChannel outputChannel = processor.outboundTopic();
+
+ outputChannel.send(MessageBuilder
+ .withPayload(json)
+ .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
+ .build());
}
public Long getProductId() {
diff --git a/src/main/java/com/example/template/OrderRepository.java b/src/main/java/com/example/template/OrderRepository.java
index d743f57..e533159 100644
--- a/src/main/java/com/example/template/OrderRepository.java
+++ b/src/main/java/com/example/template/OrderRepository.java
@@ -2,7 +2,10 @@ package com.example.template;
import org.springframework.data.repository.PagingAndSortingRepository;
+import java.util.List;
+
public interface OrderRepository extends PagingAndSortingRepository {
+ List findByCustomerId(String customerId);
}
diff --git a/src/main/java/com/example/template/OrderService.java b/src/main/java/com/example/template/OrderService.java
index adb8906..85b9631 100644
--- a/src/main/java/com/example/template/OrderService.java
+++ b/src/main/java/com/example/template/OrderService.java
@@ -1,10 +1,12 @@
package com.example.template;
+import com.example.template.config.kafka.KafkaProcessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.handler.annotation.Payload;
@@ -25,8 +27,9 @@ public class OrderService {
/**
* 상품 변경이 발생할때마다, 상품정보를 저장해 놓음
*/
- @KafkaListener(topics = "${eventTopic}", groupId = "{{policy}}")
- public void onDeliveryCompleted(@Payload String message, ConsumerRecord, ?> consumerRecord) {
+
+ @StreamListener(KafkaProcessor.INPUT)
+ public void onProductChanged(@Payload String message) {
System.out.println("##### listener : " + message);
ObjectMapper objectMapper = new ObjectMapper();
diff --git a/src/main/java/com/example/template/config/kafka/KafkaProcessor.java b/src/main/java/com/example/template/config/kafka/KafkaProcessor.java
new file mode 100644
index 0000000..5537361
--- /dev/null
+++ b/src/main/java/com/example/template/config/kafka/KafkaProcessor.java
@@ -0,0 +1,19 @@
+package com.example.template.config.kafka;
+
+import org.springframework.cloud.stream.annotation.Input;
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.SubscribableChannel;
+
+public interface KafkaProcessor {
+
+ String INPUT = "event-in";
+ String OUTPUT = "event-out";
+
+ @Input(INPUT)
+ SubscribableChannel inboundTopic();
+
+ @Output(OUTPUT)
+ MessageChannel outboundTopic();
+
+}
diff --git a/src/main/java/com/example/template/config/kafka/KafkaReceiverConfig.java b/src/main/java/com/example/template/config/kafka/KafkaReceiverConfig.java
deleted file mode 100644
index 56119bb..0000000
--- a/src/main/java/com/example/template/config/kafka/KafkaReceiverConfig.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package com.example.template.config.kafka;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.config.KafkaListenerContainerFactory;
-import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
-
-@EnableKafka
-@Configuration
-public class KafkaReceiverConfig {
-
- @Value("${spring.kafka.bootstrap-servers}")
- private String bootstrapServers;
-
- @Bean
- public Map consumerConfigs() {
- Map props = new HashMap<>();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- long seed = System.currentTimeMillis();
- props.put(ConsumerConfig.GROUP_ID_CONFIG, "orders"+seed);
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-
- return props;
- }
-
- @Bean
- public ConsumerFactory consumerFactory() {
- return new DefaultKafkaConsumerFactory<>(consumerConfigs());
- }
-
- @Bean
- public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- return factory;
- }
-
-}
diff --git a/src/main/java/com/example/template/config/kafka/KafkaSenderConfig.java b/src/main/java/com/example/template/config/kafka/KafkaSenderConfig.java
deleted file mode 100644
index 8f6442a..0000000
--- a/src/main/java/com/example/template/config/kafka/KafkaSenderConfig.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package com.example.template.config.kafka;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-
-@Configuration
-public class KafkaSenderConfig {
-
- @Value("${spring.kafka.bootstrap-servers}")
- private String bootstrapServers;
-
- @Bean
- public Map producerConfigs() {
- Map props = new HashMap();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return props;
- }
-
- @Bean
- public ProducerFactory producerFactory() {
- return new DefaultKafkaProducerFactory(producerConfigs());
- }
-
- @Bean
- public KafkaTemplate kafkaTemplate() {
- return new KafkaTemplate(producerFactory());
- }
-
-}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index b1ab783..7b2af96 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -1,31 +1,43 @@
server:
port: 8080
-
-eventTopic: eventTopic
-
## 주문은 product 정보가 필요하다.
## checkStock 를 true 로 준다면, product 서비스가 죽어있을때는 주문이 안된다.
checkStock: false
---
spring:
profiles: default
- kafka:
-# bootstrap-servers: http://35.200.47.242:31090
- bootstrap-servers: localhost:9092
- consumer:
- enable-auto-commit: true
+ cloud:
+ stream:
+ kafka:
+ binder:
+ brokers: localhost:9092
+ streams:
+ binder:
+ configuration:
+ default:
+ key:
+ serde: org.apache.kafka.common.serialization.Serdes$StringSerde
+ value:
+ serde: org.apache.kafka.common.serialization.Serdes$StringSerde
+ bindings:
+ event-in:
+ group: orders
+ destination: eventTopic
+ contentType: application/json
+ event-out:
+ destination: eventTopic
+ contentType: application/json
jpa:
properties:
hibernate:
show_sql: true
format_sql: true
-logging:
- level:
- org:
- hibernate:
- type: trace
+#logging:
+# level:
+# org.hibernate.type: trace
+# org.springframework.cloud: debug
productUrl: http://localhost:8085
@@ -34,11 +46,26 @@ server:
---
spring:
profiles: docker
- kafka:
- bootstrap-servers: my-kafka.kafka.svc.cluster.local:9092
- consumer:
- enable-auto-commit: true
+ cloud:
+ stream:
+ kafka:
+ binder:
+ brokers: my-kafka.kafka.svc.cluster.local:9092
+ streams:
+ binder:
+ configuration:
+ default:
+ key:
+ serde: org.apache.kafka.common.serialization.Serdes$StringSerde
+ value:
+ serde: org.apache.kafka.common.serialization.Serdes$StringSerde
+ bindings:
+ event-in:
+ group: orders
+ destination: eventTopicDocker
+ contentType: application/json
+ event-out:
+ destination: eventTopicDocker
+ contentType: application/json
productUrl: http://products:8080
-
-eventTopic: eventTopicDocker
\ No newline at end of file
diff --git a/src/test/java/com/example/template/ProductChangedContactTest.java b/src/test/java/com/example/template/ProductChangedContactTest.java
new file mode 100644
index 0000000..e520942
--- /dev/null
+++ b/src/test/java/com/example/template/ProductChangedContactTest.java
@@ -0,0 +1,76 @@
+package com.example.template;
+
+import com.example.template.config.kafka.KafkaProcessor;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.contract.stubrunner.StubFinder;
+import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
+import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
+import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.integration.annotation.Transformer;
+import org.springframework.messaging.Message;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import javax.management.NotificationListener;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertNotNull;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = Application.class, webEnvironment = SpringBootTest.WebEnvironment.MOCK)
+@AutoConfigureMockMvc
+// provide the [group-id]:[artifact-id]
+@AutoConfigureStubRunner(stubsMode = StubRunnerProperties.StubsMode.LOCAL, ids = "com.example:boot-camp-products")
+//@AutoConfigureStubRunner(
+// repositoryRoot="http://34.85.54.161:8081/repository/maven-snapshots/",
+// ids = "com.example:boot-camp-products:+:stubs:8090",
+// stubsMode = StubRunnerProperties.StubsMode.REMOTE
+//)
+@AutoConfigureWireMock(port = 0)
+public class ProductChangedContactTest {
+
+ @Autowired
+ StubFinder stubFinder;
+
+ @Autowired
+ private KafkaProcessor processor;
+
+ @Autowired
+ private MessageCollector messageCollector;
+
+
+// @Autowired
+// ConsumerTemplate consumerTemplate;
+
+ @Test
+ public void testOnMessageReceived() {
+ // event start
+ stubFinder.trigger("productChanged");
+
+ Message received = (Message) messageCollector.forChannel(processor.outboundTopic()).poll();
+
+ System.out.println("=======================================================");
+// System.out.println(message);
+ System.out.println(received.getPayload());
+ System.out.println("=======================================================");
+
+ DocumentContext parsedJson = JsonPath.parse(received.getPayload());
+
+ // 넘어 오는 값에 대하여 validation 을 한다. 만약 productName 컬럼이 변경되었다면 에러가 발생한다.
+ assertThat(parsedJson.read("$.productId", String.class)).matches("[\\S\\s]+");
+ assertThat(parsedJson.read("$.productName", String.class)).matches("[\\S\\s]+");
+ assertThat(parsedJson.read("$.productPrice", String.class)).matches("[\\S\\s]+");
+ assertThat(parsedJson.read("$.productStock", String.class)).matches("[\\S\\s]+");
+ assertThat(parsedJson.read("$.imageUrl", String.class)).matches("[\\S\\s]+");
+
+ }
+}
diff --git a/src/test/java/com/example/template/ProductContractTest.java b/src/test/java/com/example/template/ProductContractTest.java
new file mode 100644
index 0000000..e79119a
--- /dev/null
+++ b/src/test/java/com/example/template/ProductContractTest.java
@@ -0,0 +1,93 @@
+package com.example.template;
+
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import io.restassured.module.mockmvc.RestAssuredMockMvc;
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.cloud.contract.stubrunner.StubRunning;
+import org.springframework.cloud.contract.stubrunner.server.HttpStubsController;
+import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
+import org.springframework.cloud.contract.stubrunner.spring.StubRunnerPort;
+import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
+import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.Collections;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
+@AutoConfigureMockMvc
+// provide the [group-id]:[artifact-id]
+@AutoConfigureStubRunner(stubsMode = StubRunnerProperties.StubsMode.LOCAL, ids = "com.example:boot-camp-products")
+//@AutoConfigureStubRunner(
+// repositoryRoot="http://34.85.54.161:8081/repository/maven-snapshots/",
+// ids = "com.example:boot-camp-products:+:stubs:8090",
+// stubsMode = StubRunnerProperties.StubsMode.REMOTE
+//)
+@AutoConfigureWireMock(port = 0)
+public class ProductContractTest {
+
+ @StubRunnerPort("boot-camp-products")
+ int mockPort;
+
+// @Autowired
+// MockMvc mockMvc;
+
+ @MockBean
+ RestTemplate restTemplate;
+
+// @Autowired
+// WebApplicationContext webApplicationContext;
+
+ @Autowired
+ StubRunning stubRunning;
+
+ @Before
+ public void setup() {
+ //remove::start[]
+ RestAssuredMockMvc.standaloneSetup(new HttpStubsController(stubRunning));
+// RestAssuredMockMvc.webAppContextSetup(this.webApplicationContext);
+ //remove::end[]
+ }
+
+ @Test
+ public void getProduct_stub_test() throws Exception {
+
+ TestRestTemplate testRestTemplate = new TestRestTemplate();
+
+ testRestTemplate.getRestTemplate().setInterceptors(
+ Collections.singletonList((request, body, execution) -> {
+ request.getHeaders()
+ .add("Content-Type", MediaType.APPLICATION_JSON.toString());
+ return execution.execute(request, body);
+ }));
+ String response = testRestTemplate
+ .getForObject("http://localhost:" + this.mockPort + "/product/1",
+ String.class);
+
+ System.out.println("=======================================================");
+ System.out.println(this.mockPort);
+ System.out.println(response);
+ System.out.println("=======================================================");
+
+ DocumentContext parsedJson = JsonPath.parse(response);
+ // and:
+ Assertions.assertThat(parsedJson.read("$.id", String.class)).matches("[\\S\\s]+");
+ Assertions.assertThat(parsedJson.read("$.name", String.class)).matches("[\\S\\s]+");
+ Assertions.assertThat(parsedJson.read("$.price", String.class)).matches("[\\S\\s]+");
+ Assertions.assertThat(parsedJson.read("$.stock", String.class)).matches("[\\S\\s]+");
+ Assertions.assertThat(parsedJson.read("$.imageUrl", String.class)).matches("[\\S\\s]+");
+ }
+
+}
\ No newline at end of file