메시지 발행자 구현 (회원 서비스)

This commit is contained in:
assu10
2020-10-02 18:59:33 +09:00
parent 73f8d8e786
commit 99a42c1159
6 changed files with 140 additions and 6 deletions

View File

@@ -35,10 +35,10 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<!--<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
</dependency>-->
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-rsa</artifactId>
@@ -66,6 +66,18 @@
<version>1.1.1.RELEASE</version>
</dependency>
<!-- 스프링 클라우드 스트림 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<!-- 스프링 클라우드 카프카 (메시지 브로커) -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@@ -5,6 +5,8 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.security.oauth2.config.annotation.web.configuration.EnableResourceServer;
@@ -15,7 +17,9 @@ import java.util.List;
@SpringBootApplication
@EnableEurekaClient
@EnableResourceServer // 보호 자원으로 설정
@EnableResourceServer // 보호 자원으로 설정
@EnableBinding(Source.class) // 이 애플리케이션을 메시지 브로커와 바인딩하도록 스프링 클라우드 스트림 설정
// Source.class 로 지정 시 해당 서비스가 Source 클래스에 정의된 채널들을 이용해 메시지 브로커와 통신
public class MemberServiceApplication {
public static void main(String[] args) {
SpringApplication.run(MemberServiceApplication.class, args);

View File

@@ -2,7 +2,7 @@ package com.assu.cloud.memberservice.controller;
import com.assu.cloud.memberservice.client.EventRestTemplateClient;
import com.assu.cloud.memberservice.config.CustomConfig;
import org.springframework.http.HttpStatus;
import com.assu.cloud.memberservice.event.source.SimpleSourceBean;
import org.springframework.web.bind.annotation.*;
import javax.servlet.ServletRequest;
@@ -13,10 +13,12 @@ public class MemberController {
private final CustomConfig customConfig;
private final EventRestTemplateClient eventRestTemplateClient;
private final SimpleSourceBean simpleSourceBean;
public MemberController(CustomConfig customConfig, EventRestTemplateClient eventRestTemplateClient) {
public MemberController(CustomConfig customConfig, EventRestTemplateClient eventRestTemplateClient, SimpleSourceBean simpleSourceBean) {
this.customConfig = customConfig;
this.eventRestTemplateClient = eventRestTemplateClient;
this.simpleSourceBean = simpleSourceBean;
}
@GetMapping(value = "name/{nick}")
@@ -48,4 +50,14 @@ public class MemberController {
public String userInfo(@PathVariable("name") String name) {
return "[MEMBER] " + name;
}
/**
* 단순 메시지 발행
*/
@PostMapping("/{userId}")
public void saveUserId(@PathVariable("userId") String userId) {
// DB 에 save 작업..
simpleSourceBean.publishMemberChange("SAVE", userId);
}
}

View File

@@ -0,0 +1,51 @@
package com.assu.cloud.memberservice.event.model;
/**
* 발행될 메시지를 표현하는 POJO
*/
public class MemberChangeModel {
private String type;
private String action;
private String userId;
private String correlationId;
public MemberChangeModel(String type, String action, String userId, String correlationId) {
// super();
this.type = type;
this.action = action;
this.userId = userId;
this.correlationId = correlationId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getCorrelationId() {
return correlationId;
}
public void setCorrelationId(String correlationId) {
this.correlationId = correlationId;
}
}

View File

@@ -0,0 +1,43 @@
package com.assu.cloud.memberservice.event.source;
import com.assu.cloud.memberservice.event.model.MemberChangeModel;
import com.assu.cloud.memberservice.utils.CustomContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* 메시지 브로커에 메시지 발행
*/
@Component
public class SimpleSourceBean {
private final Source source;
private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);
/**
* 스프링 클라우드 스트림은 서비스가 사용할 소스 인터페이스 구현을 주입
*/
public SimpleSourceBean(Source source) {
this.source = source;
}
/**
* 메시지 발행
*/
public void publishMemberChange(String action, String userId) {
logger.debug("======= Sending kafka message {} for User Id : {}", action, userId);
// com.assu.cloud.memberservice.event.model.MemberChangeModel
logger.debug("======= MemberChangeModel.class.getTypeName() : {}", MemberChangeModel.class.getTypeName());
// 발행될 메시지는 자바 POJO
MemberChangeModel change = new MemberChangeModel(MemberChangeModel.class.getTypeName(),
action,
userId,
CustomContext.getCorrelationId());
// 메시지를 보낼 준비가 되면 Source 클래스에 정의된 채널에서 send() 메서드 사용
source.output().send(MessageBuilder.withPayload(change).build());
}
}

View File

@@ -1,2 +1,14 @@
server:
port: 8090
port: 8090
# 스프링 클라우드 스트림 설정
spring:
cloud:
stream: # stream.bindings 는 스트림의 메시지 브로커에 발행하려는 구성의 시작점
bindings:
output: # output 은 채널명, SimpleSourceBean.publishMemberChange() 의 source.output() 채널에 매핑됨
destination: mbChangeTopic # 메시지를 넣은 메시지 큐(토픽) 이름
content-type: application/json # 스트림에 송수신할 메시지 타입의 정보 (JSON 으로 직렬화)
kafka: # stream.kafka 는 해당 서비스를 카프카에 바인딩
binder:
zkNodes: localhost # zkNodes, brokers 는 스트림에게 카프카와 주키퍼의 네트워크 위치 전달
brokers: localhost