diff --git a/member-service/pom.xml b/member-service/pom.xml index 960e1ac..25911a0 100644 --- a/member-service/pom.xml +++ b/member-service/pom.xml @@ -35,10 +35,10 @@ org.springframework.cloud spring-cloud-starter-config - + org.springframework.security spring-security-rsa @@ -66,6 +66,18 @@ 1.1.1.RELEASE + + + org.springframework.cloud + spring-cloud-stream + + + + + org.springframework.cloud + spring-cloud-starter-stream-kafka + + org.springframework.boot spring-boot-starter-test diff --git a/member-service/src/main/java/com/assu/cloud/memberservice/MemberServiceApplication.java b/member-service/src/main/java/com/assu/cloud/memberservice/MemberServiceApplication.java index 53ff2be..fa765c9 100644 --- a/member-service/src/main/java/com/assu/cloud/memberservice/MemberServiceApplication.java +++ b/member-service/src/main/java/com/assu/cloud/memberservice/MemberServiceApplication.java @@ -5,6 +5,8 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; import org.springframework.security.oauth2.config.annotation.web.configuration.EnableResourceServer; @@ -15,7 +17,9 @@ import java.util.List; @SpringBootApplication @EnableEurekaClient -@EnableResourceServer // 보호 자원으로 설정 +@EnableResourceServer // 보호 자원으로 설정 +@EnableBinding(Source.class) // 이 애플리케이션을 메시지 브로커와 바인딩하도록 스프링 클라우드 스트림 설정 + // Source.class 로 지정 시 해당 서비스가 Source 클래스에 정의된 채널들을 이용해 메시지 브로커와 통신 public class MemberServiceApplication { public static void main(String[] args) { SpringApplication.run(MemberServiceApplication.class, args); diff --git a/member-service/src/main/java/com/assu/cloud/memberservice/controller/MemberController.java b/member-service/src/main/java/com/assu/cloud/memberservice/controller/MemberController.java index f8114e9..9698651 100644 --- a/member-service/src/main/java/com/assu/cloud/memberservice/controller/MemberController.java +++ b/member-service/src/main/java/com/assu/cloud/memberservice/controller/MemberController.java @@ -2,7 +2,7 @@ package com.assu.cloud.memberservice.controller; import com.assu.cloud.memberservice.client.EventRestTemplateClient; import com.assu.cloud.memberservice.config.CustomConfig; -import org.springframework.http.HttpStatus; +import com.assu.cloud.memberservice.event.source.SimpleSourceBean; import org.springframework.web.bind.annotation.*; import javax.servlet.ServletRequest; @@ -13,10 +13,12 @@ public class MemberController { private final CustomConfig customConfig; private final EventRestTemplateClient eventRestTemplateClient; + private final SimpleSourceBean simpleSourceBean; - public MemberController(CustomConfig customConfig, EventRestTemplateClient eventRestTemplateClient) { + public MemberController(CustomConfig customConfig, EventRestTemplateClient eventRestTemplateClient, SimpleSourceBean simpleSourceBean) { this.customConfig = customConfig; this.eventRestTemplateClient = eventRestTemplateClient; + this.simpleSourceBean = simpleSourceBean; } @GetMapping(value = "name/{nick}") @@ -48,4 +50,14 @@ public class MemberController { public String userInfo(@PathVariable("name") String name) { return "[MEMBER] " + name; } + + /** + * 단순 메시지 발행 + */ + @PostMapping("/{userId}") + public void saveUserId(@PathVariable("userId") String userId) { + // DB 에 save 작업.. + simpleSourceBean.publishMemberChange("SAVE", userId); + } + } diff --git a/member-service/src/main/java/com/assu/cloud/memberservice/event/model/MemberChangeModel.java b/member-service/src/main/java/com/assu/cloud/memberservice/event/model/MemberChangeModel.java new file mode 100644 index 0000000..8bbbbd1 --- /dev/null +++ b/member-service/src/main/java/com/assu/cloud/memberservice/event/model/MemberChangeModel.java @@ -0,0 +1,51 @@ +package com.assu.cloud.memberservice.event.model; + +/** + * 발행될 메시지를 표현하는 POJO + */ +public class MemberChangeModel { + private String type; + private String action; + private String userId; + private String correlationId; + + public MemberChangeModel(String type, String action, String userId, String correlationId) { + // super(); + this.type = type; + this.action = action; + this.userId = userId; + this.correlationId = correlationId; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getAction() { + return action; + } + + public void setAction(String action) { + this.action = action; + } + + public String getUserId() { + return userId; + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public String getCorrelationId() { + return correlationId; + } + + public void setCorrelationId(String correlationId) { + this.correlationId = correlationId; + } +} diff --git a/member-service/src/main/java/com/assu/cloud/memberservice/event/source/SimpleSourceBean.java b/member-service/src/main/java/com/assu/cloud/memberservice/event/source/SimpleSourceBean.java new file mode 100644 index 0000000..fb1c6d7 --- /dev/null +++ b/member-service/src/main/java/com/assu/cloud/memberservice/event/source/SimpleSourceBean.java @@ -0,0 +1,43 @@ +package com.assu.cloud.memberservice.event.source; + +import com.assu.cloud.memberservice.event.model.MemberChangeModel; +import com.assu.cloud.memberservice.utils.CustomContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +/** + * 메시지 브로커에 메시지 발행 + */ +@Component +public class SimpleSourceBean { + private final Source source; + + private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class); + + /** + * 스프링 클라우드 스트림은 서비스가 사용할 소스 인터페이스 구현을 주입 + */ + public SimpleSourceBean(Source source) { + this.source = source; + } + + /** + * 메시지 발행 + */ + public void publishMemberChange(String action, String userId) { + logger.debug("======= Sending kafka message {} for User Id : {}", action, userId); + // com.assu.cloud.memberservice.event.model.MemberChangeModel + logger.debug("======= MemberChangeModel.class.getTypeName() : {}", MemberChangeModel.class.getTypeName()); + + // 발행될 메시지는 자바 POJO + MemberChangeModel change = new MemberChangeModel(MemberChangeModel.class.getTypeName(), + action, + userId, + CustomContext.getCorrelationId()); + // 메시지를 보낼 준비가 되면 Source 클래스에 정의된 채널에서 send() 메서드 사용 + source.output().send(MessageBuilder.withPayload(change).build()); + } +} diff --git a/member-service/src/main/resources/application.yaml b/member-service/src/main/resources/application.yaml index 58de10c..5d33bd5 100644 --- a/member-service/src/main/resources/application.yaml +++ b/member-service/src/main/resources/application.yaml @@ -1,2 +1,14 @@ server: - port: 8090 \ No newline at end of file + port: 8090 +# 스프링 클라우드 스트림 설정 +spring: + cloud: + stream: # stream.bindings 는 스트림의 메시지 브로커에 발행하려는 구성의 시작점 + bindings: + output: # output 은 채널명, SimpleSourceBean.publishMemberChange() 의 source.output() 채널에 매핑됨 + destination: mbChangeTopic # 메시지를 넣은 메시지 큐(토픽) 이름 + content-type: application/json # 스트림에 송수신할 메시지 타입의 정보 (JSON 으로 직렬화) + kafka: # stream.kafka 는 해당 서비스를 카프카에 바인딩 + binder: + zkNodes: localhost # zkNodes, brokers 는 스트림에게 카프카와 주키퍼의 네트워크 위치 전달 + brokers: localhost