kafka crud
This commit is contained in:
@@ -12,9 +12,9 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
import com.example.oneul.domain.post.dao.command.PostCommandRepository;
|
||||
import com.example.oneul.domain.post.dao.query.PostQueryRepository;
|
||||
import com.example.oneul.domain.post.domain.Post;
|
||||
import com.example.oneul.domain.post.domain.PostDocument;
|
||||
import com.example.oneul.domain.user.domain.UserEntity;
|
||||
import com.example.oneul.global.error.exception.NotFoundException;
|
||||
import com.example.oneul.infra.dto.PostMessage;
|
||||
import com.example.oneul.infra.kafka.KafkaPublisher;
|
||||
|
||||
@Service
|
||||
@@ -45,12 +45,16 @@ public class PostCommnadServiceImpl implements PostCommandService{
|
||||
.writer(userEntity)
|
||||
.build());
|
||||
|
||||
PostDocument postDocument = new PostDocument(
|
||||
kafkaPublisher.sendMessage(
|
||||
"post",
|
||||
new PostMessage(
|
||||
"INSERT",
|
||||
postEntity.getId(),
|
||||
postEntity.getCreatedAt(),
|
||||
postEntity.getContent(),
|
||||
postEntity.getWriter().getUsername());
|
||||
kafkaPublisher.sendMessage("post", postDocument);
|
||||
postEntity.getWriter().getUsername()
|
||||
)
|
||||
);
|
||||
|
||||
log.info("user: " + userEntity.toString() + " create " + postEntity.toString());
|
||||
return postEntity;
|
||||
@@ -59,13 +63,22 @@ public class PostCommnadServiceImpl implements PostCommandService{
|
||||
@Override
|
||||
public Post updatePost(Long id, Post post, HttpSession httpSession){
|
||||
UserEntity userEntity = (UserEntity) httpSession.getAttribute("user");
|
||||
// TODO: 적절한 방법인지 확인하기
|
||||
Post postEntity = postCommandRepository.findByIdAndWriter(id, userEntity).orElseThrow(() -> new NotFoundException(id + " post not found"));
|
||||
|
||||
postEntity.setConent(post.getContent());
|
||||
postEntity = postCommandRepository.save(postEntity);
|
||||
PostDocument postDocument = postQueryRepository.findById(postEntity.getId()).orElseThrow(() -> new NotFoundException("query repository doesn't have " + id));
|
||||
postDocument.setContent(postEntity.getContent());
|
||||
postQueryRepository.save(postDocument);
|
||||
|
||||
kafkaPublisher.sendMessage(
|
||||
"post",
|
||||
new PostMessage(
|
||||
"UPDATE",
|
||||
postEntity.getId(),
|
||||
postEntity.getCreatedAt(),
|
||||
postEntity.getContent(),
|
||||
postEntity.getWriter().getUsername()
|
||||
)
|
||||
);
|
||||
|
||||
log.info(postEntity.toString() + " is updated");
|
||||
|
||||
return postEntity;
|
||||
@@ -75,8 +88,16 @@ public class PostCommnadServiceImpl implements PostCommandService{
|
||||
public void deletePost(Long id, HttpSession httpSession){
|
||||
// TODO: 이 때 세션이 만기되면 어떡함
|
||||
UserEntity userEntity = (UserEntity)httpSession.getAttribute("user");
|
||||
|
||||
postCommandRepository.deleteByIdAndWriter(id, userEntity);
|
||||
postQueryRepository.deleteById(id);
|
||||
|
||||
kafkaPublisher.sendMessage(
|
||||
"post",
|
||||
PostMessage.builder()
|
||||
.type("DELETE")
|
||||
.id(id)
|
||||
.build());
|
||||
|
||||
log.info("post " + id + " is deleted");
|
||||
}
|
||||
}
|
||||
@@ -13,7 +13,7 @@ import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||
|
||||
import com.example.oneul.domain.post.domain.PostDocument;
|
||||
import com.example.oneul.infra.dto.PostMessage;
|
||||
|
||||
@Configuration
|
||||
public class KafkaConsumerConfig {
|
||||
@@ -24,19 +24,19 @@ public class KafkaConsumerConfig {
|
||||
private String groupId;
|
||||
|
||||
@Bean
|
||||
public ConsumerFactory<String, PostDocument> consumerFactory() {
|
||||
public ConsumerFactory<String, PostMessage> consumerFactory() {
|
||||
Map<String, Object> configs = new HashMap<>();
|
||||
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
||||
|
||||
return new DefaultKafkaConsumerFactory<>(configs,
|
||||
new StringDeserializer(),
|
||||
new JsonDeserializer<>(PostDocument.class));
|
||||
new JsonDeserializer<>(PostMessage.class));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, PostDocument> postListener() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, PostDocument> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||
public ConcurrentKafkaListenerContainerFactory<String, PostMessage> postListener() {
|
||||
ConcurrentKafkaListenerContainerFactory<String, PostMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(consumerFactory());
|
||||
return factory;
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||
|
||||
import com.example.oneul.domain.post.domain.PostDocument;
|
||||
import com.example.oneul.infra.dto.PostMessage;
|
||||
|
||||
@Configuration
|
||||
public class KafkaProducerConfig {
|
||||
@@ -21,7 +21,7 @@ public class KafkaProducerConfig {
|
||||
private String bootstrapServers;
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, PostDocument> producerFactory(){
|
||||
public ProducerFactory<String, PostMessage> createProducerFactory(){
|
||||
Map<String, Object> configs = new HashMap<>();
|
||||
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
@@ -30,7 +30,7 @@ public class KafkaProducerConfig {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, PostDocument> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
public KafkaTemplate<String, PostMessage> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(createProducerFactory());
|
||||
}
|
||||
}
|
||||
|
||||
118
src/main/java/com/example/oneul/infra/dto/PostMessage.java
Normal file
118
src/main/java/com/example/oneul/infra/dto/PostMessage.java
Normal file
@@ -0,0 +1,118 @@
|
||||
package com.example.oneul.infra.dto;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
public class PostMessage {
|
||||
private String type;
|
||||
private Long id;
|
||||
private LocalDateTime createdAt;
|
||||
private String content;
|
||||
private String wirter;
|
||||
|
||||
public PostMessage() {}
|
||||
|
||||
public PostMessage(String type, Long id, LocalDateTime createdAt, String content, String writer){
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
this.createdAt = createdAt;
|
||||
this.content = content;
|
||||
this.wirter = writer;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public Long getId(){
|
||||
return this.id;
|
||||
}
|
||||
|
||||
public void setId(Long id){
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public LocalDateTime getCreatedAt(){
|
||||
return this.createdAt;
|
||||
}
|
||||
|
||||
public void setCreatedAt(LocalDateTime createdAt){
|
||||
this.createdAt = createdAt;
|
||||
}
|
||||
|
||||
public String getContent(){
|
||||
return this.content;
|
||||
}
|
||||
|
||||
public void setConent(String content){
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
public String getWriter(){
|
||||
return this.wirter;
|
||||
}
|
||||
|
||||
public void setWriter(String writer){
|
||||
this.wirter = writer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(){
|
||||
return "PostMessage["
|
||||
+ "type: " + this.type
|
||||
+ ", id: " + this.id
|
||||
+ ", createdAt: " + this.createdAt
|
||||
+ ", content: " + this.content
|
||||
+ ", writer: " + this.wirter
|
||||
+ "]";
|
||||
}
|
||||
|
||||
public static Builder builder(){
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private String type;
|
||||
private Long id;
|
||||
private String content;
|
||||
private LocalDateTime createdAt;
|
||||
private String writer;
|
||||
|
||||
public PostMessage build(){
|
||||
return new PostMessage(
|
||||
type,
|
||||
id,
|
||||
createdAt,
|
||||
content,
|
||||
writer);
|
||||
}
|
||||
|
||||
public Builder type(String type){
|
||||
this.type = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder id(Long id){
|
||||
this.id = id;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder createdAt(LocalDateTime createdAt){
|
||||
this.createdAt = createdAt;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder content(String content){
|
||||
this.content = content;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder writer(String writer){
|
||||
this.writer = writer;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5,18 +5,18 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.example.oneul.domain.post.domain.PostDocument;
|
||||
import com.example.oneul.infra.dto.PostMessage;
|
||||
|
||||
@Component
|
||||
public class KafkaPublisher {
|
||||
private final Logger log = LoggerFactory.getLogger(KafkaPublisher.class);
|
||||
private final KafkaTemplate<String, PostDocument> kafkaTemplate;
|
||||
private final KafkaTemplate<String, PostMessage> kafkaTemplate;
|
||||
|
||||
public KafkaPublisher(KafkaTemplate<String,PostDocument> kafkaTemplate){
|
||||
public KafkaPublisher(KafkaTemplate<String,PostMessage> kafkaTemplate){
|
||||
this.kafkaTemplate = kafkaTemplate;
|
||||
}
|
||||
|
||||
public void sendMessage(String topic, PostDocument payload){
|
||||
public void sendMessage(String topic, PostMessage payload){
|
||||
log.info("publish message: topic: " + topic + ", payload: " + payload.toString());
|
||||
kafkaTemplate.send(topic, payload);
|
||||
}
|
||||
|
||||
@@ -7,6 +7,8 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import com.example.oneul.domain.post.dao.query.PostQueryRepository;
|
||||
import com.example.oneul.domain.post.domain.PostDocument;
|
||||
import com.example.oneul.global.error.exception.NotFoundException;
|
||||
import com.example.oneul.infra.dto.PostMessage;
|
||||
|
||||
@Component
|
||||
public class KafkaSubscriber {
|
||||
@@ -18,8 +20,22 @@ public class KafkaSubscriber {
|
||||
}
|
||||
|
||||
@KafkaListener(topics = "post", groupId = "post", containerFactory = "postListener")
|
||||
public void listen(PostDocument post){
|
||||
log.info("message listen: " + post.toString());
|
||||
postQueryRepository.save(post);
|
||||
public void listen(PostMessage postMessage){
|
||||
log.info("message listen: " + postMessage.toString());
|
||||
|
||||
if(postMessage.getType().equals("INSERT")){
|
||||
postQueryRepository.save(new PostDocument(
|
||||
postMessage.getId(),
|
||||
postMessage.getCreatedAt(),
|
||||
postMessage.getContent(),
|
||||
postMessage.getWriter()));
|
||||
} else if(postMessage.getType().equals("UPDATE")){
|
||||
PostDocument postDocument = postQueryRepository.findById(postMessage.getId())
|
||||
.orElseThrow(() ->new NotFoundException("query repository doesn't have " + postMessage.getId()));
|
||||
postDocument.setContent(postMessage.getContent());
|
||||
postQueryRepository.save(postDocument);
|
||||
} else if(postMessage.getType().equals("DELETE")){
|
||||
postQueryRepository.deleteById(postMessage.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user