message queue ocp

This commit is contained in:
jinho jeong
2022-07-13 13:34:06 +09:00
parent c59421779d
commit ecb28d5ced
10 changed files with 181 additions and 38 deletions

View File

@@ -14,8 +14,9 @@ import com.example.oneul.domain.post.domain.Post;
import com.example.oneul.domain.user.domain.UserEntity; import com.example.oneul.domain.user.domain.UserEntity;
import com.example.oneul.global.error.exception.ExpiredSessionException; import com.example.oneul.global.error.exception.ExpiredSessionException;
import com.example.oneul.global.error.exception.NotFoundException; import com.example.oneul.global.error.exception.NotFoundException;
import com.example.oneul.infra.dto.PostMessage;
import com.example.oneul.infra.kafka.KafkaPublisher; import com.example.oneul.infra.kafka.KafkaPublisher;
import com.example.oneul.infra.kafka.Type;
import com.example.oneul.infra.kafka.service.MessageQueueFactory;
@Service @Service
@Transactional @Transactional
@@ -23,11 +24,13 @@ public class PostCommnadServiceImpl implements PostCommandService{
private final Logger log = LoggerFactory.getLogger(PostCommnadServiceImpl.class); private final Logger log = LoggerFactory.getLogger(PostCommnadServiceImpl.class);
private final PostCommandRepository postCommandRepository; private final PostCommandRepository postCommandRepository;
private final MessageQueueFactory messageQueueFactory;
private final KafkaPublisher kafkaPublisher; private final KafkaPublisher kafkaPublisher;
public PostCommnadServiceImpl(PostCommandRepository postCommandRepository, KafkaPublisher kafkaPublisher){ public PostCommnadServiceImpl(PostCommandRepository postCommandRepository, KafkaPublisher kafkaPublisher, MessageQueueFactory messageQueueFactory){
this.postCommandRepository = postCommandRepository; this.postCommandRepository = postCommandRepository;
this.kafkaPublisher = kafkaPublisher; this.kafkaPublisher = kafkaPublisher;
this.messageQueueFactory = messageQueueFactory;
} }
@Override @Override
@@ -47,16 +50,8 @@ public class PostCommnadServiceImpl implements PostCommandService{
.writer(userEntity) .writer(userEntity)
.build()); .build());
kafkaPublisher.sendMessage( Type type = Type.valueOf("INSERT");
"post", messageQueueFactory.getTye(type).apply(postEntity);
new PostMessage(
"INSERT",
postEntity.getId(),
postEntity.getCreatedAt(),
postEntity.getContent(),
postEntity.getWriter().getUsername()
)
);
log.info("user: " + userEntity.toString() + " create " + postEntity.toString()); log.info("user: " + userEntity.toString() + " create " + postEntity.toString());
return postEntity; return postEntity;
@@ -73,16 +68,8 @@ public class PostCommnadServiceImpl implements PostCommandService{
postEntity.setConent(post.getContent()); postEntity.setConent(post.getContent());
postEntity = postCommandRepository.save(postEntity); postEntity = postCommandRepository.save(postEntity);
kafkaPublisher.sendMessage( Type type = Type.valueOf("UPDATE");
"post", messageQueueFactory.getTye(type).apply(postEntity);
new PostMessage(
"UPDATE",
postEntity.getId(),
postEntity.getCreatedAt(),
postEntity.getContent(),
postEntity.getWriter().getUsername()
)
);
log.info(postEntity.toString() + " is updated"); log.info(postEntity.toString() + " is updated");
@@ -95,15 +82,11 @@ public class PostCommnadServiceImpl implements PostCommandService{
if(userEntity == null){ if(userEntity == null){
throw new ExpiredSessionException("만료된 세션"); throw new ExpiredSessionException("만료된 세션");
} }
Post postEntity = postCommandRepository.findByIdAndWriter(id, userEntity).orElseThrow(() -> new NotFoundException(id + " post not found"));
postCommandRepository.deleteByIdAndWriter(id, userEntity); postCommandRepository.deleteByIdAndWriter(id, userEntity);
kafkaPublisher.sendMessage( Type type = Type.valueOf("DELETE");
"post", messageQueueFactory.getTye(type).apply(postEntity);
PostMessage.builder()
.type("DELETE")
.id(id)
.build());
log.info("post " + id + " is deleted"); log.info("post " + id + " is deleted");
} }

View File

@@ -14,6 +14,7 @@ import org.hibernate.annotations.CreationTimestamp;
import org.springframework.security.crypto.password.PasswordEncoder; import org.springframework.security.crypto.password.PasswordEncoder;
import com.example.oneul.domain.user.exception.WrongUsernameAndPasswordException; import com.example.oneul.domain.user.exception.WrongUsernameAndPasswordException;
import com.fasterxml.jackson.annotation.JsonIgnore;
@Entity @Entity
@Table(name = "user") @Table(name = "user")
@@ -23,8 +24,10 @@ public class UserEntity implements Serializable {
private Long id; private Long id;
@Column(name = "username", nullable = false, unique = true) @Column(name = "username", nullable = false, unique = true)
private String username; private String username;
@JsonIgnore
@Column(name = "password", nullable = false) @Column(name = "password", nullable = false)
private String password; private String password;
@JsonIgnore
@CreationTimestamp @CreationTimestamp
@Column(name = "createAt", nullable = false, updatable = false) @Column(name = "createAt", nullable = false, updatable = false)
private LocalDateTime createdAt; private LocalDateTime createdAt;

View File

@@ -0,0 +1,13 @@
package com.example.oneul.infra.kafka;
public enum Type {
INSERT("INSERT"),
UPDATE("UPDATE"),
DELETE("DELETE");
final String type;
Type(String type){
this.type = type;
}
}

View File

@@ -0,0 +1,31 @@
package com.example.oneul.infra.kafka.service;
import org.springframework.stereotype.Component;
import com.example.oneul.domain.post.domain.Post;
import com.example.oneul.infra.dto.PostMessage;
import com.example.oneul.infra.kafka.KafkaPublisher;
import com.example.oneul.infra.kafka.Type;
@Component
public class DeleteMessageService implements MessageQueueService {
private final KafkaPublisher kafkaPublisher;
public DeleteMessageService(KafkaPublisher kafkaPublisher){
this.kafkaPublisher = kafkaPublisher;
}
@Override
public Type getMessageType(){
return Type.DELETE;
}
@Override
public PostMessage transaction(Post post){
PostMessage postMessage = PostMessage.builder()
.id(post.getId())
.build();
kafkaPublisher.sendMessage("post", postMessage);
return postMessage;
}
}

View File

@@ -0,0 +1,34 @@
package com.example.oneul.infra.kafka.service;
import org.springframework.stereotype.Component;
import com.example.oneul.domain.post.domain.Post;
import com.example.oneul.infra.dto.PostMessage;
import com.example.oneul.infra.kafka.KafkaPublisher;
import com.example.oneul.infra.kafka.Type;
@Component
public class InsertMessageService implements MessageQueueService {
private final KafkaPublisher kafkaPublisher;
public InsertMessageService(KafkaPublisher kafkaPublisher){
this.kafkaPublisher = kafkaPublisher;
}
@Override
public Type getMessageType() {
return Type.INSERT;
}
@Override
public PostMessage transaction(Post post){
PostMessage postMessage = new PostMessage(
Type.INSERT.toString(),
post.getId(),
post.getCreatedAt(),
post.getContent(),
post.getWriter().getUsername());
kafkaPublisher.sendMessage("post", postMessage);
return postMessage;
}
}

View File

@@ -0,0 +1,33 @@
package com.example.oneul.infra.kafka.service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import com.example.oneul.domain.post.domain.Post;
import com.example.oneul.infra.dto.PostMessage;
import com.example.oneul.infra.kafka.Type;
@Component
public class MessageQueueFactory {
private final Map<Type, Function<Post, PostMessage>> meessageQueueServiceMap = new HashMap<>();
public MessageQueueFactory(List<MessageQueueService> meessageQueueServices) {
if(CollectionUtils.isEmpty(meessageQueueServices)){
throw new IllegalArgumentException("메시지 큐 구현체가 존재하지 않습니다.");
}
for(MessageQueueService service : meessageQueueServices){
Function<Post, PostMessage> transaction = service::transaction;
this.meessageQueueServiceMap.put(service.getMessageType(), transaction);
}
}
public Function<Post, PostMessage> getTye(Type type){
return meessageQueueServiceMap.get(type);
}
}

View File

@@ -0,0 +1,10 @@
package com.example.oneul.infra.kafka.service;
import com.example.oneul.domain.post.domain.Post;
import com.example.oneul.infra.dto.PostMessage;
import com.example.oneul.infra.kafka.Type;
public interface MessageQueueService {
Type getMessageType();
PostMessage transaction(Post post);
}

View File

@@ -0,0 +1,34 @@
package com.example.oneul.infra.kafka.service;
import org.springframework.stereotype.Component;
import com.example.oneul.domain.post.domain.Post;
import com.example.oneul.infra.dto.PostMessage;
import com.example.oneul.infra.kafka.KafkaPublisher;
import com.example.oneul.infra.kafka.Type;
@Component
public class UpdateMessageService implements MessageQueueService {
private final KafkaPublisher kafkaPublisher;
public UpdateMessageService(KafkaPublisher kafkaPublisher){
this.kafkaPublisher = kafkaPublisher;
}
@Override
public Type getMessageType(){
return Type.UPDATE;
}
@Override
public PostMessage transaction(Post post) {
PostMessage postMessage = new PostMessage(
Type.UPDATE.toString(),
post.getId(),
post.getCreatedAt(),
post.getContent(),
post.getWriter().getUsername());
kafkaPublisher.sendMessage("post", postMessage);
return postMessage;
}
}

View File

@@ -26,6 +26,7 @@ import com.example.oneul.domain.post.service.command.PostCommnadServiceImpl;
import com.example.oneul.domain.user.domain.UserEntity; import com.example.oneul.domain.user.domain.UserEntity;
import com.example.oneul.infra.dto.PostMessage; import com.example.oneul.infra.dto.PostMessage;
import com.example.oneul.infra.kafka.KafkaPublisher; import com.example.oneul.infra.kafka.KafkaPublisher;
import com.example.oneul.infra.kafka.service.MessageQueueFactory;
@ActiveProfiles("test") @ActiveProfiles("test")
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
@@ -35,10 +36,11 @@ public class PostCommandSerivceTest {
@Mock private KafkaPublisher kafkaPublisher; @Mock private KafkaPublisher kafkaPublisher;
protected MockHttpSession httpSession; protected MockHttpSession httpSession;
@Mock private UserEntity mockWriter; @Mock private UserEntity mockWriter;
@Mock private MessageQueueFactory messageQueueFactory;
@BeforeEach @BeforeEach
public void setUp() throws Exception { public void setUp() throws Exception {
postCommandService = new PostCommnadServiceImpl(postCommandRepository, kafkaPublisher); postCommandService = new PostCommnadServiceImpl(postCommandRepository, kafkaPublisher, messageQueueFactory);
httpSession = new MockHttpSession(); httpSession = new MockHttpSession();
httpSession.setAttribute("user", mockWriter); httpSession.setAttribute("user", mockWriter);
} }

View File

@@ -63,17 +63,17 @@ public class UserServiceTest {
@Test @Test
public void loginTest() throws Exception { public void loginTest() throws Exception {
// given // given
LoginDTO loginDTO = createLoginDTO(); LoginDTO loginDTO = createLoginDTO();
UserEntity loginUser = loginDTO.toEntity(); UserEntity loginUser = loginDTO.toEntity();
// mocking // mocking
Long mockUserId = 1L; Long mockUserId = 1L;
UserEntity userEntity = mockUser(mockUserId); UserEntity userEntity = mockUser(mockUserId);
given(userRepository.findByUsername(loginUser.getUsername())).willReturn(Optional.ofNullable(userEntity)); given(userRepository.findByUsername(loginUser.getUsername())).willReturn(Optional.ofNullable(userEntity));
given(passwordEncoder.matches(any(), eq(userEntity.getPassword()))).willReturn(true); given(passwordEncoder.matches(any(), eq(userEntity.getPassword()))).willReturn(true);
// when // when
UserEntity user = userService.login(userEntity, httpSession); UserEntity user = userService.login(userEntity, httpSession);
// then // then
assertNotEquals(httpSession.getAttribute("user"), null); assertNotEquals(httpSession.getAttribute("user"), null);