Kafka
This commit is contained in:
@@ -34,6 +34,8 @@ dependencies {
|
|||||||
|
|
||||||
implementation 'org.springframework.session:spring-session-data-redis'
|
implementation 'org.springframework.session:spring-session-data-redis'
|
||||||
|
|
||||||
|
implementation 'org.springframework.kafka:spring-kafka'
|
||||||
|
|
||||||
runtimeOnly 'mysql:mysql-connector-java'
|
runtimeOnly 'mysql:mysql-connector-java'
|
||||||
|
|
||||||
compileOnly 'org.projectlombok:lombok'
|
compileOnly 'org.projectlombok:lombok'
|
||||||
|
|||||||
@@ -7,8 +7,6 @@ import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
|
|||||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||||
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;
|
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@EnableJpaAuditing
|
@EnableJpaAuditing
|
||||||
@EnableScheduling
|
@EnableScheduling
|
||||||
@EnableBatchProcessing
|
@EnableBatchProcessing
|
||||||
@@ -18,5 +16,4 @@ public class OneulApplication {
|
|||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
SpringApplication.run(OneulApplication.class, args);
|
SpringApplication.run(OneulApplication.class, args);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import com.example.oneul.domain.post.domain.Post;
|
|||||||
import com.example.oneul.domain.post.domain.PostDocument;
|
import com.example.oneul.domain.post.domain.PostDocument;
|
||||||
import com.example.oneul.domain.user.domain.UserEntity;
|
import com.example.oneul.domain.user.domain.UserEntity;
|
||||||
import com.example.oneul.global.error.exception.NotFoundException;
|
import com.example.oneul.global.error.exception.NotFoundException;
|
||||||
|
import com.example.oneul.infra.kafka.KafkaPublisher;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Transactional
|
@Transactional
|
||||||
@@ -23,10 +24,12 @@ public class PostCommnadServiceImpl implements PostCommandService{
|
|||||||
|
|
||||||
private final PostCommandRepository postCommandRepository;
|
private final PostCommandRepository postCommandRepository;
|
||||||
private final PostQueryRepository postQueryRepository;
|
private final PostQueryRepository postQueryRepository;
|
||||||
|
private final KafkaPublisher kafkaPublisher;
|
||||||
|
|
||||||
public PostCommnadServiceImpl(PostCommandRepository postCommandRepository, PostQueryRepository postQueryRepository){
|
public PostCommnadServiceImpl(PostCommandRepository postCommandRepository, PostQueryRepository postQueryRepository, KafkaPublisher kafkaPublisher){
|
||||||
this.postCommandRepository = postCommandRepository;
|
this.postCommandRepository = postCommandRepository;
|
||||||
this.postQueryRepository = postQueryRepository;
|
this.postQueryRepository = postQueryRepository;
|
||||||
|
this.kafkaPublisher = kafkaPublisher;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -42,14 +45,13 @@ public class PostCommnadServiceImpl implements PostCommandService{
|
|||||||
.writer(userEntity)
|
.writer(userEntity)
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
// TODO: 메시지 큐잉으로 전환
|
PostDocument postDocument = new PostDocument(
|
||||||
postQueryRepository.save(
|
postEntity.getId(),
|
||||||
new PostDocument(
|
postEntity.getCreatedAt(),
|
||||||
postEntity.getId(),
|
postEntity.getContent(),
|
||||||
postEntity.getCreatedAt(),
|
postEntity.getWriter().getUsername());
|
||||||
postEntity.getContent(),
|
kafkaPublisher.sendMessage("post", postDocument);
|
||||||
postEntity.getWriter().getUsername()));
|
|
||||||
|
|
||||||
log.info("user: " + userEntity.toString() + " create " + postEntity.toString());
|
log.info("user: " + userEntity.toString() + " create " + postEntity.toString());
|
||||||
return postEntity;
|
return postEntity;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,11 +2,6 @@ package com.example.oneul.domain.user.api;
|
|||||||
|
|
||||||
import javax.servlet.http.HttpSession;
|
import javax.servlet.http.HttpSession;
|
||||||
|
|
||||||
import com.example.oneul.domain.user.domain.UserEntity;
|
|
||||||
import com.example.oneul.domain.user.dto.LoginDTO;
|
|
||||||
import com.example.oneul.domain.user.dto.SignUpDTO;
|
|
||||||
import com.example.oneul.domain.user.service.UserService;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.web.bind.annotation.RequestBody;
|
import org.springframework.web.bind.annotation.RequestBody;
|
||||||
@@ -14,6 +9,11 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
|||||||
import org.springframework.web.bind.annotation.RequestMethod;
|
import org.springframework.web.bind.annotation.RequestMethod;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
import com.example.oneul.domain.user.domain.UserEntity;
|
||||||
|
import com.example.oneul.domain.user.dto.LoginDTO;
|
||||||
|
import com.example.oneul.domain.user.dto.SignUpDTO;
|
||||||
|
import com.example.oneul.domain.user.service.UserService;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping(value = "/user")
|
@RequestMapping(value = "/user")
|
||||||
public class UserApi {
|
public class UserApi {
|
||||||
@@ -27,7 +27,7 @@ public class UserApi {
|
|||||||
|
|
||||||
@RequestMapping(value="/signup/", method=RequestMethod.POST)
|
@RequestMapping(value="/signup/", method=RequestMethod.POST)
|
||||||
public UserEntity signUp(@RequestBody SignUpDTO signUpDTO) {
|
public UserEntity signUp(@RequestBody SignUpDTO signUpDTO) {
|
||||||
// TODO: password1, password2 같은지 검사
|
// TODO: password1, password2 같은지 validator로 검사
|
||||||
UserEntity user = userService.signUp(signUpDTO.toEntity());
|
UserEntity user = userService.signUp(signUpDTO.toEntity());
|
||||||
log.info("signUp: " + user.toString());
|
log.info("signUp: " + user.toString());
|
||||||
return user;
|
return user;
|
||||||
|
|||||||
@@ -2,13 +2,13 @@ package com.example.oneul.domain.user.service;
|
|||||||
|
|
||||||
import javax.servlet.http.HttpSession;
|
import javax.servlet.http.HttpSession;
|
||||||
|
|
||||||
import com.example.oneul.domain.user.domain.UserEntity;
|
|
||||||
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import com.example.oneul.domain.user.domain.UserEntity;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
public interface UserService {
|
public interface UserService {
|
||||||
UserEntity signUp(UserEntity userEntity);
|
UserEntity signUp(UserEntity userEntity);
|
||||||
UserEntity login(UserEntity userEntity , HttpSession httpSession);
|
UserEntity login(UserEntity userEntity, HttpSession httpSession);
|
||||||
void logout(HttpSession httpSession);
|
void logout(HttpSession httpSession);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,43 @@
|
|||||||
|
package com.example.oneul.infra.config;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
|
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||||
|
|
||||||
|
import com.example.oneul.domain.post.domain.PostDocument;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class KafkaConsumerConfig {
|
||||||
|
@Value("${spring.kafka.bootstrap-servers}")
|
||||||
|
private String bootstrapServers;
|
||||||
|
|
||||||
|
@Value("${spring.kafka.consumer.group-id}")
|
||||||
|
private String groupId;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConsumerFactory<String, PostDocument> consumerFactory() {
|
||||||
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||||
|
configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
||||||
|
|
||||||
|
return new DefaultKafkaConsumerFactory<>(configs,
|
||||||
|
new StringDeserializer(),
|
||||||
|
new JsonDeserializer<>(PostDocument.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, PostDocument> postListener() {
|
||||||
|
ConcurrentKafkaListenerContainerFactory<String, PostDocument> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
|
factory.setConsumerFactory(consumerFactory());
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,36 @@
|
|||||||
|
package com.example.oneul.infra.config;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.kafka.core.ProducerFactory;
|
||||||
|
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||||
|
|
||||||
|
import com.example.oneul.domain.post.domain.PostDocument;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class KafkaProducerConfig {
|
||||||
|
@Value("${spring.kafka.bootstrap-servers}")
|
||||||
|
private String bootstrapServers;
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ProducerFactory<String, PostDocument> producerFactory(){
|
||||||
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||||
|
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||||||
|
return new DefaultKafkaProducerFactory<>(configs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<String, PostDocument> kafkaTemplate() {
|
||||||
|
return new KafkaTemplate<>(producerFactory());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,23 @@
|
|||||||
|
package com.example.oneul.infra.kafka;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import com.example.oneul.domain.post.domain.PostDocument;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class KafkaPublisher {
|
||||||
|
private final Logger log = LoggerFactory.getLogger(KafkaPublisher.class);
|
||||||
|
private final KafkaTemplate<String, PostDocument> kafkaTemplate;
|
||||||
|
|
||||||
|
public KafkaPublisher(KafkaTemplate<String,PostDocument> kafkaTemplate){
|
||||||
|
this.kafkaTemplate = kafkaTemplate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendMessage(String topic, PostDocument payload){
|
||||||
|
log.info("publish message: topic: " + topic + ", payload: " + payload.toString());
|
||||||
|
kafkaTemplate.send(topic, payload);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,25 @@
|
|||||||
|
package com.example.oneul.infra.kafka;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import com.example.oneul.domain.post.dao.query.PostQueryRepository;
|
||||||
|
import com.example.oneul.domain.post.domain.PostDocument;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class KafkaSubscriber {
|
||||||
|
private final Logger log = LoggerFactory.getLogger(KafkaSubscriber.class);
|
||||||
|
private final PostQueryRepository postQueryRepository;
|
||||||
|
|
||||||
|
public KafkaSubscriber(PostQueryRepository postQueryRepository){
|
||||||
|
this.postQueryRepository = postQueryRepository;
|
||||||
|
}
|
||||||
|
|
||||||
|
@KafkaListener(topics = "post", groupId = "post", containerFactory = "postListener")
|
||||||
|
public void listen(PostDocument post){
|
||||||
|
log.info("message listen: " + post.toString());
|
||||||
|
postQueryRepository.save(post);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -22,7 +22,14 @@ spring:
|
|||||||
host: localhost
|
host: localhost
|
||||||
port: 27017
|
port: 27017
|
||||||
database: oneul
|
database: oneul
|
||||||
|
kafka:
|
||||||
|
bootstrap-servers: localhost:9092
|
||||||
|
consumer:
|
||||||
|
group-id: post
|
||||||
|
auto-offset-reset: earliest
|
||||||
|
|
||||||
server:
|
server:
|
||||||
|
port: 5555
|
||||||
servlet:
|
servlet:
|
||||||
session:
|
session:
|
||||||
timeout: 60
|
timeout: 60
|
||||||
|
|||||||
Reference in New Issue
Block a user