diff --git a/src/main/java/com/example/oneul/domain/post/service/command/PostCommnadServiceImpl.java b/src/main/java/com/example/oneul/domain/post/service/command/PostCommnadServiceImpl.java index 5b17801..9242a3e 100644 --- a/src/main/java/com/example/oneul/domain/post/service/command/PostCommnadServiceImpl.java +++ b/src/main/java/com/example/oneul/domain/post/service/command/PostCommnadServiceImpl.java @@ -12,9 +12,9 @@ import org.springframework.transaction.annotation.Transactional; import com.example.oneul.domain.post.dao.command.PostCommandRepository; import com.example.oneul.domain.post.dao.query.PostQueryRepository; import com.example.oneul.domain.post.domain.Post; -import com.example.oneul.domain.post.domain.PostDocument; import com.example.oneul.domain.user.domain.UserEntity; import com.example.oneul.global.error.exception.NotFoundException; +import com.example.oneul.infra.dto.PostMessage; import com.example.oneul.infra.kafka.KafkaPublisher; @Service @@ -45,12 +45,16 @@ public class PostCommnadServiceImpl implements PostCommandService{ .writer(userEntity) .build()); - PostDocument postDocument = new PostDocument( - postEntity.getId(), - postEntity.getCreatedAt(), - postEntity.getContent(), - postEntity.getWriter().getUsername()); - kafkaPublisher.sendMessage("post", postDocument); + kafkaPublisher.sendMessage( + "post", + new PostMessage( + "INSERT", + postEntity.getId(), + postEntity.getCreatedAt(), + postEntity.getContent(), + postEntity.getWriter().getUsername() + ) + ); log.info("user: " + userEntity.toString() + " create " + postEntity.toString()); return postEntity; @@ -59,13 +63,22 @@ public class PostCommnadServiceImpl implements PostCommandService{ @Override public Post updatePost(Long id, Post post, HttpSession httpSession){ UserEntity userEntity = (UserEntity) httpSession.getAttribute("user"); - // TODO: 적절한 방법인지 확인하기 Post postEntity = postCommandRepository.findByIdAndWriter(id, userEntity).orElseThrow(() -> new NotFoundException(id + " post not found")); + postEntity.setConent(post.getContent()); postEntity = postCommandRepository.save(postEntity); - PostDocument postDocument = postQueryRepository.findById(postEntity.getId()).orElseThrow(() -> new NotFoundException("query repository doesn't have " + id)); - postDocument.setContent(postEntity.getContent()); - postQueryRepository.save(postDocument); + + kafkaPublisher.sendMessage( + "post", + new PostMessage( + "UPDATE", + postEntity.getId(), + postEntity.getCreatedAt(), + postEntity.getContent(), + postEntity.getWriter().getUsername() + ) + ); + log.info(postEntity.toString() + " is updated"); return postEntity; @@ -75,8 +88,16 @@ public class PostCommnadServiceImpl implements PostCommandService{ public void deletePost(Long id, HttpSession httpSession){ // TODO: 이 때 세션이 만기되면 어떡함 UserEntity userEntity = (UserEntity)httpSession.getAttribute("user"); + postCommandRepository.deleteByIdAndWriter(id, userEntity); - postQueryRepository.deleteById(id); + + kafkaPublisher.sendMessage( + "post", + PostMessage.builder() + .type("DELETE") + .id(id) + .build()); + log.info("post " + id + " is deleted"); } } \ No newline at end of file diff --git a/src/main/java/com/example/oneul/infra/config/KafkaConsumerConfig.java b/src/main/java/com/example/oneul/infra/config/KafkaConsumerConfig.java index 327e0d2..1f09094 100644 --- a/src/main/java/com/example/oneul/infra/config/KafkaConsumerConfig.java +++ b/src/main/java/com/example/oneul/infra/config/KafkaConsumerConfig.java @@ -13,7 +13,7 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; -import com.example.oneul.domain.post.domain.PostDocument; +import com.example.oneul.infra.dto.PostMessage; @Configuration public class KafkaConsumerConfig { @@ -24,19 +24,19 @@ public class KafkaConsumerConfig { private String groupId; @Bean - public ConsumerFactory consumerFactory() { + public ConsumerFactory consumerFactory() { Map configs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), - new JsonDeserializer<>(PostDocument.class)); + new JsonDeserializer<>(PostMessage.class)); } @Bean - public ConcurrentKafkaListenerContainerFactory postListener() { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + public ConcurrentKafkaListenerContainerFactory postListener() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } diff --git a/src/main/java/com/example/oneul/infra/config/KafkaProducerConfig.java b/src/main/java/com/example/oneul/infra/config/KafkaProducerConfig.java index 13cdf5a..0046cc3 100644 --- a/src/main/java/com/example/oneul/infra/config/KafkaProducerConfig.java +++ b/src/main/java/com/example/oneul/infra/config/KafkaProducerConfig.java @@ -13,7 +13,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonSerializer; -import com.example.oneul.domain.post.domain.PostDocument; +import com.example.oneul.infra.dto.PostMessage; @Configuration public class KafkaProducerConfig { @@ -21,7 +21,7 @@ public class KafkaProducerConfig { private String bootstrapServers; @Bean - public ProducerFactory producerFactory(){ + public ProducerFactory createProducerFactory(){ Map configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -30,7 +30,7 @@ public class KafkaProducerConfig { } @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(createProducerFactory()); } } diff --git a/src/main/java/com/example/oneul/infra/dto/PostMessage.java b/src/main/java/com/example/oneul/infra/dto/PostMessage.java new file mode 100644 index 0000000..5fb2d2e --- /dev/null +++ b/src/main/java/com/example/oneul/infra/dto/PostMessage.java @@ -0,0 +1,118 @@ +package com.example.oneul.infra.dto; + +import java.time.LocalDateTime; + +public class PostMessage { + private String type; + private Long id; + private LocalDateTime createdAt; + private String content; + private String wirter; + + public PostMessage() {} + + public PostMessage(String type, Long id, LocalDateTime createdAt, String content, String writer){ + this.type = type; + this.id = id; + this.createdAt = createdAt; + this.content = content; + this.wirter = writer; + } + + public String getType() { + return this.type; + } + + public void setType(String type) { + this.type = type; + } + + public Long getId(){ + return this.id; + } + + public void setId(Long id){ + this.id = id; + } + + public LocalDateTime getCreatedAt(){ + return this.createdAt; + } + + public void setCreatedAt(LocalDateTime createdAt){ + this.createdAt = createdAt; + } + + public String getContent(){ + return this.content; + } + + public void setConent(String content){ + this.content = content; + } + + public String getWriter(){ + return this.wirter; + } + + public void setWriter(String writer){ + this.wirter = writer; + } + + @Override + public String toString(){ + return "PostMessage[" + + "type: " + this.type + + ", id: " + this.id + + ", createdAt: " + this.createdAt + + ", content: " + this.content + + ", writer: " + this.wirter + + "]"; + } + + public static Builder builder(){ + return new Builder(); + } + + public static class Builder { + private String type; + private Long id; + private String content; + private LocalDateTime createdAt; + private String writer; + + public PostMessage build(){ + return new PostMessage( + type, + id, + createdAt, + content, + writer); + } + + public Builder type(String type){ + this.type = type; + return this; + } + + public Builder id(Long id){ + this.id = id; + return this; + } + + public Builder createdAt(LocalDateTime createdAt){ + this.createdAt = createdAt; + return this; + } + + public Builder content(String content){ + this.content = content; + return this; + } + + public Builder writer(String writer){ + this.writer = writer; + return this; + } + } +} diff --git a/src/main/java/com/example/oneul/infra/kafka/KafkaPublisher.java b/src/main/java/com/example/oneul/infra/kafka/KafkaPublisher.java index 214b9b5..1d386ee 100644 --- a/src/main/java/com/example/oneul/infra/kafka/KafkaPublisher.java +++ b/src/main/java/com/example/oneul/infra/kafka/KafkaPublisher.java @@ -5,18 +5,18 @@ import org.slf4j.LoggerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; -import com.example.oneul.domain.post.domain.PostDocument; +import com.example.oneul.infra.dto.PostMessage; @Component public class KafkaPublisher { private final Logger log = LoggerFactory.getLogger(KafkaPublisher.class); - private final KafkaTemplate kafkaTemplate; + private final KafkaTemplate kafkaTemplate; - public KafkaPublisher(KafkaTemplate kafkaTemplate){ + public KafkaPublisher(KafkaTemplate kafkaTemplate){ this.kafkaTemplate = kafkaTemplate; } - public void sendMessage(String topic, PostDocument payload){ + public void sendMessage(String topic, PostMessage payload){ log.info("publish message: topic: " + topic + ", payload: " + payload.toString()); kafkaTemplate.send(topic, payload); } diff --git a/src/main/java/com/example/oneul/infra/kafka/KafkaSubscriber.java b/src/main/java/com/example/oneul/infra/kafka/KafkaSubscriber.java index 364ab0e..8435e96 100644 --- a/src/main/java/com/example/oneul/infra/kafka/KafkaSubscriber.java +++ b/src/main/java/com/example/oneul/infra/kafka/KafkaSubscriber.java @@ -7,6 +7,8 @@ import org.springframework.stereotype.Component; import com.example.oneul.domain.post.dao.query.PostQueryRepository; import com.example.oneul.domain.post.domain.PostDocument; +import com.example.oneul.global.error.exception.NotFoundException; +import com.example.oneul.infra.dto.PostMessage; @Component public class KafkaSubscriber { @@ -18,8 +20,22 @@ public class KafkaSubscriber { } @KafkaListener(topics = "post", groupId = "post", containerFactory = "postListener") - public void listen(PostDocument post){ - log.info("message listen: " + post.toString()); - postQueryRepository.save(post); + public void listen(PostMessage postMessage){ + log.info("message listen: " + postMessage.toString()); + + if(postMessage.getType().equals("INSERT")){ + postQueryRepository.save(new PostDocument( + postMessage.getId(), + postMessage.getCreatedAt(), + postMessage.getContent(), + postMessage.getWriter())); + } else if(postMessage.getType().equals("UPDATE")){ + PostDocument postDocument = postQueryRepository.findById(postMessage.getId()) + .orElseThrow(() ->new NotFoundException("query repository doesn't have " + postMessage.getId())); + postDocument.setContent(postMessage.getContent()); + postQueryRepository.save(postDocument); + } else if(postMessage.getType().equals("DELETE")){ + postQueryRepository.deleteById(postMessage.getId()); + } } }