스프링 클라우드 스트림 설정

This commit is contained in:
assu10
2020-10-03 22:11:33 +09:00
parent b569b62569
commit 7bfdd48316
7 changed files with 82 additions and 35 deletions

View File

@@ -1,7 +1,6 @@
package com.assu.cloud.eventservice;
import com.assu.cloud.eventservice.config.CustomConfig;
import com.assu.cloud.eventservice.event.model.MemberChangeModel;
import com.assu.cloud.eventservice.utils.CustomContextInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +30,7 @@ import java.util.List;
@SpringBootApplication
@EnableFeignClients
@EnableResourceServer // 보호 자원으로 설정
@EnableBinding(Sink.class) // 이 애플리케이션을 메시지 브로커와 바인딩하도록 스프링 클라우드 스트림 설정
//@EnableBinding(Sink.class) // 이 애플리케이션을 메시지 브로커와 바인딩하도록 스프링 클라우드 스트림 설정
// Sink.class 로 지정 시 해당 서비스가 Sink 클래스에 정의된 채널들을 이용해 메시지 브로커와 통신
public class EventServiceApplication {
@@ -47,10 +46,11 @@ public class EventServiceApplication {
* 채널에서 받은 메시지를 MemberChangeModel 이라는 POJO 로 자동 역직렬화
* @param mbChange
*/
@StreamListener(Sink.INPUT) // 메시지가 입력 채널에서 수신될 때마다 이 메서드 실행
// CustomChannel 작업하면서 MemberChangeHandler 로 아래 메서드 옮김
/*@StreamListener(Sink.INPUT) // 메시지가 입력 채널에서 수신될 때마다 이 메서드 실행
public void loggerSink(MemberChangeModel mbChange) {
logger.info("======= Received an event for organization id {}", mbChange.getUserId());
}
}*/
/**
* 레디스 서버에 실제 DB 커넥션을 설정

View File

@@ -0,0 +1,13 @@
package com.assu.cloud.eventservice.event;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* 사용자 정의 input 채널 (SINK.INPUT 같은...), Consumer
*/
public interface CustomChannels {
@Input("inboundMemberChanges") // @Input 은 채널 이름을 정의하는 메서드 레벨 애너테이션
SubscribableChannel members(); // @Input 애너테이션으로 노출된 채널은 모두 SubscribableChannel 클래스를 반환해야 함
}

View File

@@ -0,0 +1,51 @@
package com.assu.cloud.eventservice.event.handlers;
import com.assu.cloud.eventservice.event.CustomChannels;
import com.assu.cloud.eventservice.event.model.MemberChangeModel;
import com.assu.cloud.eventservice.repository.MemberRedisRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
/**
* 사용자 정의 채널을 사용하여 메시지 수신
* 이 애플리케이션을 메시지 브로커와 바인딩하도록 스프링 클라우드 스트림 설정
*/
@EnableBinding(CustomChannels.class) // CustomChannels.class 로 지정 시 해당 서비스가 CustomChannels 클래스에 정의된 채널들을 이용해 메시지 브로커와 통신
public class MemberChangeHandler {
private static final Logger logger = LoggerFactory.getLogger(MemberChangeHandler.class);
private final MemberRedisRepository memberRedisRepository;
public MemberChangeHandler(MemberRedisRepository memberRedisRepository) {
this.memberRedisRepository = memberRedisRepository;
}
/**
* 메시지가 입력 채널에서 수신될 때마다 이 메서드 실행
*/
@StreamListener("inboundMemberChanges") // Sink.INPUT 대신 사용자 정의 채널명인 inboundMemberChanges 전달
public void loggerSink(MemberChangeModel mbChange) {
logger.info("======= Received a message of type {}", mbChange.getType());
switch (mbChange.getAction()) {
case "GET":
logger.debug("Received a GET event from the member service for userId {}", mbChange.getUserId());
break;
case "SAVE":
logger.debug("Received a SAVE event from the member service for userId {}", mbChange.getUserId());
break;
case "UPDATE":
logger.debug("Received a UPDATE event from the member service for userId {}", mbChange.getUserId());
memberRedisRepository.deleteMember(mbChange.getUserId()); // 캐시 무효화
break;
case "DELETE":
logger.debug("Received a DELETE event from the member service for userId {}", mbChange.getUserId());
memberRedisRepository.deleteMember(mbChange.getUserId());
break;
default:
logger.debug("Received an UNKNOWN event from the member service for userId {}", mbChange.getType());
break;
}
}
}

View File

@@ -1,19 +1,2 @@
server:
port: 8070
# 스프링 클라우드 스트림 설정
spring:
cloud:
stream:
bindings:
input: # input 은 채널명, EventServiceApplication 의 Sink.INPUT 채널에 매핑되고, input 채널을 mgChangeTopic 큐에 매핑함
destination: mbChangeTopic # 메시지를 넣은 메시지 큐(토픽) 이름
content-type: application/json
group: eventGroup # 메시지를 소비할 소비자 그룹의 이름
kafka: # stream.kafka 는 해당 서비스를 카프카에 바인딩
binder:
zkNodes: localhost # zkNodes, brokers 는 스트림에게 카프카와 주키퍼의 네트워크 위치 전달
brokers: localhost
#redis
redis:
server: localhost
port: 6379

View File

@@ -70,7 +70,7 @@ public class MemberController {
*/
@GetMapping("{userId}")
public Member userInfoCache(@PathVariable("userId") String userId) {
logger.debug("====== 회원 서비스 호출!");
logger.debug("====== 회원 저장 서비스 호출!");
// DB 를 조회하여 회원 데이터 조회 (간편성을 위해 아래와 같이 리턴함)
Member member = new Member();
@@ -78,4 +78,15 @@ public class MemberController {
member.setName("rinda");
return member;
}
/**
* 이벤트 서비스에서 캐시 제거를 위한 메서드
*/
@DeleteMapping("userInfo/{userId}")
public void deleteUserInfoCache(@PathVariable("userId") String userId) {
logger.debug("====== 회원 삭제 후 DELETE 메시지 발생");
// DB 에 삭제 작업 (간편성을 위해 DB 작업은 생략)
simpleSourceBean.publishMemberChange("DELETE", userId);
}
}

View File

@@ -25,7 +25,8 @@ public class ResourceServerConfig extends ResourceServerConfigurerAdapter {
//http.authorizeRequests().anyRequest().authenticated();
http.authorizeRequests()
.antMatchers(HttpMethod.PUT, "/member/**") // 쉼표로 구분하여 엔드 포인트 목록 받음
//.antMatchers(HttpMethod.PUT, "/member/**") // 쉼표로 구분하여 엔드 포인트 목록 받음
.antMatchers(HttpMethod.DELETE, "/member/**") // 쉼표로 구분하여 엔드 포인트 목록 받음
.hasRole("ADMIN") // ADMIN 권한을 가진 사용자만 PUT 호출 가능
.anyRequest() // 서비스의 모든 엔드포인트도 인증된 사용자만 접근 가능하도록 설정
.authenticated();

View File

@@ -1,14 +1,2 @@
server:
port: 8090
# 스프링 클라우드 스트림 설정
spring:
cloud:
stream: # stream.bindings 는 스트림의 메시지 브로커에 발행하려는 구성의 시작점
bindings:
output: # output 은 채널명, SimpleSourceBean.publishMemberChange() 의 source.output() 채널에 매핑됨
destination: mbChangeTopic # 메시지를 넣은 메시지 큐(토픽) 이름
content-type: application/json # 스트림에 송수신할 메시지 타입의 정보 (JSON 으로 직렬화)
kafka: # stream.kafka 는 해당 서비스를 카프카에 바인딩
binder:
zkNodes: localhost # zkNodes, brokers 는 스트림에게 카프카와 주키퍼의 네트워크 위치 전달
brokers: localhost