#24 simple sns: alarm with kafka

This commit is contained in:
haerong22
2022-12-08 03:27:04 +09:00
parent 8a8cb186e8
commit 9d8a3cd994
8 changed files with 119 additions and 20 deletions

View File

@@ -25,6 +25,7 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation 'com.vladmihalcea:hibernate-types-52:2.17.3'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'org.postgresql:postgresql'

View File

@@ -0,0 +1,24 @@
package com.example.sns.consumer;
import com.example.sns.model.event.AlarmEvent;
import com.example.sns.service.AlarmService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmConsumer {
private final AlarmService alarmService;
@KafkaListener(topics = "${spring.kafka.topic.alarm}")
public void consumeAlarm(AlarmEvent event, Acknowledgment ack) {
log.info("Consume the event {}", event);
alarmService.send(event.getAlarmType(), event.getArgs(), event.getReceiveUserId());
}
}

View File

@@ -2,9 +2,11 @@ package com.example.sns.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AlarmArgs {
private Integer fromUserId;

View File

@@ -0,0 +1,16 @@
package com.example.sns.model.event;
import com.example.sns.model.AlarmArgs;
import com.example.sns.model.AlarmType;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AlarmEvent {
private Integer receiveUserId;
private AlarmType alarmType;
private AlarmArgs args;
}

View File

@@ -0,0 +1,24 @@
package com.example.sns.producer;
import com.example.sns.model.event.AlarmEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class AlarmProducer {
private final KafkaTemplate<Integer, AlarmEvent> kafkaTemplate;
@Value("${spring.kafka.topic.alarm}")
private String topic;
public void send(AlarmEvent event) {
kafkaTemplate.send(topic, event.getReceiveUserId(), event);
log.info("Send to Kafka finished");
}
}

View File

@@ -2,7 +2,13 @@ package com.example.sns.service;
import com.example.sns.exception.ErrorCode;
import com.example.sns.exception.SnsApplicationException;
import com.example.sns.model.AlarmArgs;
import com.example.sns.model.AlarmType;
import com.example.sns.model.entity.AlarmEntity;
import com.example.sns.model.entity.UserEntity;
import com.example.sns.repository.AlarmEntityRepository;
import com.example.sns.repository.EmitterRepository;
import com.example.sns.repository.UserEntityRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -18,6 +24,8 @@ public class AlarmService {
private final static Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final static String ALARM_NAME = "alarm";
private final EmitterRepository emitterRepository;
private final AlarmEntityRepository alarmEntityRepository;
private final UserEntityRepository userEntityRepository;
public SseEmitter connectAlarm(Integer userId) {
SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIMEOUT);
@@ -40,12 +48,18 @@ public class AlarmService {
return sseEmitter;
}
public void send(Integer alarmId, Integer userId) {
emitterRepository.get(userId).ifPresentOrElse(sseEmitter -> {
public void send(AlarmType type, AlarmArgs arg, Integer receiverUserId) {
UserEntity user = userEntityRepository.findById(receiverUserId)
.orElseThrow(() -> new SnsApplicationException(ErrorCode.USER_NOT_FOUND));
// alarm save
AlarmEntity alarmEntity = alarmEntityRepository.save(AlarmEntity.of(user,type,arg));
emitterRepository.get(receiverUserId).ifPresentOrElse(sseEmitter -> {
try {
sseEmitter.send(SseEmitter.event().id(alarmId.toString()).name(ALARM_NAME).data("new alarm"));
sseEmitter.send(SseEmitter.event().id(alarmEntity.getId().toString()).name(ALARM_NAME).data("new alarm"));
} catch (IOException e) {
emitterRepository.delete(userId);
emitterRepository.delete(receiverUserId);
throw new SnsApplicationException(ErrorCode.ALARM_CONNECT_ERROR);
}
}, () -> log.info("No emitter founded."));

View File

@@ -6,6 +6,8 @@ import com.example.sns.model.AlarmType;
import com.example.sns.model.Comment;
import com.example.sns.model.Post;
import com.example.sns.model.entity.*;
import com.example.sns.model.event.AlarmEvent;
import com.example.sns.producer.AlarmProducer;
import com.example.sns.repository.*;
import lombok.RequiredArgsConstructor;
import org.springframework.data.domain.Page;
@@ -25,6 +27,7 @@ public class PostService {
private final CommentEntityRepository commentEntityRepository;
private final AlarmEntityRepository alarmEntityRepository;
private final AlarmService alarmService;
private final AlarmProducer alarmProducer;
@Transactional
public void create(String title, String body, String username) {
@@ -95,14 +98,10 @@ public class PostService {
// save like
likeEntityRepository.save(LikeEntity.of(userEntity, postEntity));
AlarmEntity alarmEntity = alarmEntityRepository.save(AlarmEntity.of(
postEntity.getUser(),
AlarmType.NEW_LIKE_ON_POST,
new AlarmArgs(userEntity.getId(), postEntity.getId())
)
);
alarmService.send(alarmEntity.getId(), postEntity.getUser().getId());
alarmProducer.send(
new AlarmEvent(postEntity.getUser().getId(),
AlarmType.NEW_LIKE_ON_POST,
new AlarmArgs(userEntity.getId(), postEntity.getId())));
}
public long likeCount(Integer postId) {
@@ -119,14 +118,10 @@ public class PostService {
commentEntityRepository.save(CommentEntity.of(userEntity, postEntity, comment));
AlarmEntity alarmEntity = alarmEntityRepository.save(AlarmEntity.of(
postEntity.getUser(),
AlarmType.NEW_COMMENT_ON_POST,
new AlarmArgs(userEntity.getId(), postEntity.getId())
)
);
alarmService.send(alarmEntity.getId(), postEntity.getUser().getId());
alarmProducer.send(
new AlarmEvent(postEntity.getUser().getId(),
AlarmType.NEW_COMMENT_ON_POST,
new AlarmArgs(userEntity.getId(), postEntity.getId())));
}
public Page<Comment> getComments(Integer postId, Pageable pageable) {

View File

@@ -24,6 +24,29 @@ spring:
redis:
url: redis://default:czGScHDINUmn78dFTZn0Bs5qa55BegQm@redis-19917.c8.us-east-1-2.ec2.cloud.redislabs.com:19917
kafka:
properties:
security.protocol: SASL_SSL
sasl.mechanism: SCRAM-SHA-256
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="2kfr2bqj" password="e2gm1woz7hG2mLoP8d-fImmSid_7vVN2";
consumer:
properties.spring.json.trusted.packages: "*"
bootstrap-servers: dory-01.srvs.cloudkafka.com:9094, dory-02.srvs.cloudkafka.com:9094, dory-03.srvs.cloudkafka.com:9094
group-id: alarm
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
listener:
ack-mode: manual
producer:
bootstrap-servers: dory-01.srvs.cloudkafka.com:9094, dory-02.srvs.cloudkafka.com:9094, dory-03.srvs.cloudkafka.com:9094
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties.enable.idempotence: false
topic:
alarm: 2kfr2bqj-alarm
jwt:
secret-key: simple_sns_application_secret_key
token: