Changes related Eventually Consistent CQRS + E/S MS with Debezium

This commit is contained in:
Shazin Sadakath
2022-07-18 22:50:08 +05:30
parent 229944df7d
commit fdb61dab00
13 changed files with 199 additions and 188 deletions

17
README.md Normal file
View File

@@ -0,0 +1,17 @@
Developing Microservices with CQRS and Event Sourcing Patterns using GraphQL + Spring Boot + Kafka
================================
https://shazinsadakath.medium.com/developing-microservices-with-cqrs-and-event-sourcing-patterns-using-graphql-spring-boot-kafka-19f259a7aaa5
# Running Demonstration
Run Commands
------
`docker compose up -d`
`curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "user-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "db", "database.port": "3306", "database.user": "root", "database.password": "root", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "userrdb", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.user" } }'`
`curl -H "Accept:application/json" localhost:8083/connectors/user-connector`

View File

@@ -37,19 +37,14 @@
<artifactId>cqrs-ms-user-domain</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.hsqldb/hsqldb -->
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>2.6.1</version>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
</dependencies>

View File

@@ -1,10 +1,8 @@
package com.github.shazin.cqrs.ms.user.command.controller;
import com.github.shazin.cqrs.ms.user.command.dto.UserInput;
import com.github.shazin.cqrs.ms.user.command.handler.UserEventHandler;
import com.github.shazin.cqrs.ms.user.dto.User;
import com.github.shazin.cqrs.ms.user.dto.UserCreateEvent;
import com.github.shazin.cqrs.ms.user.dto.UserDeleteEvent;
import com.github.shazin.cqrs.ms.user.command.entity.UserEntity;
import com.github.shazin.cqrs.ms.user.command.service.UserService;
import org.springframework.graphql.data.method.annotation.Argument;
import org.springframework.graphql.data.method.annotation.MutationMapping;
import org.springframework.stereotype.Controller;
@@ -14,24 +12,22 @@ import java.util.UUID;
@Controller
public class UserCommandController {
private final UserEventHandler userEventHandler;
private final UserService userService;
public UserCommandController(UserEventHandler userService) {
this.userEventHandler = userService;
public UserCommandController(UserService userService) {
this.userService = userService;
}
@MutationMapping
public User createUser(@Argument UserInput user) {
User userCreated = new User(UUID.randomUUID().toString(), user.firstName(), user.lastName(), user.dateOfBirth(), user.identityNumber());
UserCreateEvent userCreateEvent = new UserCreateEvent(userCreated);
userEventHandler.publishUserEvent(userCreateEvent);
public UserEntity createUser(@Argument UserInput user) {
UserEntity userCreated = new UserEntity(UUID.randomUUID().toString(), user.firstName(), user.lastName(), user.dateOfBirth(), user.identityNumber());
userService.createUser(userCreated);
return userCreated;
}
@MutationMapping
public String deleteUser(@Argument String id) {
UserDeleteEvent userDeleteEvent = new UserDeleteEvent(id);
userEventHandler.publishUserEvent(userDeleteEvent);
userService.deleteUserById(id);
return id;
}

View File

@@ -1,82 +0,0 @@
package com.github.shazin.cqrs.ms.user.command.entity;
import javax.persistence.*;
import java.util.Date;
import java.util.Objects;
import java.util.UUID;
@Entity
@Table(name = "cqrs_user_command")
public class UserCommand {
@Id
private String id = UUID.randomUUID().toString();
private String payload;
private Date createdDate;
private String createdBy;
private String type;
public UserCommand() {
}
public UserCommand(String id, String payload, Date createdDate, String createdBy, String type) {
this.id = id;
this.payload = payload;
this.createdDate = createdDate;
this.createdBy = createdBy;
this.type = type;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPayload() {
return payload;
}
public void setPayload(String payload) {
this.payload = payload;
}
public Date getCreatedDate() {
return createdDate;
}
public void setCreatedDate(Date createdDate) {
this.createdDate = createdDate;
}
public String getCreatedBy() {
return createdBy;
}
public void setCreatedBy(String createdBy) {
this.createdBy = createdBy;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UserCommand that = (UserCommand) o;
return Objects.equals(id, that.id) && Objects.equals(payload, that.payload) && Objects.equals(createdDate, that.createdDate) && Objects.equals(createdBy, that.createdBy) && Objects.equals(type, that.type);
}
@Override
public int hashCode() {
return Objects.hash(id, payload, createdDate, createdBy, type);
}
}

View File

@@ -0,0 +1,83 @@
package com.github.shazin.cqrs.ms.user.command.entity;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import java.time.LocalDate;
import java.util.Objects;
import java.util.UUID;
@Entity
@Table(name = "cqrs_user")
public class UserEntity {
@Id
private String id = UUID.randomUUID().toString();
private String firstName;
private String lastName;
private LocalDate dateOfBirth;
private String identityNumber;
public UserEntity() {
}
public UserEntity(String id, String firstName, String lastName, LocalDate dateOfBirth, String identityNumber) {
this.id = id;
this.firstName = firstName;
this.lastName = lastName;
this.dateOfBirth = dateOfBirth;
this.identityNumber = identityNumber;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public LocalDate getDateOfBirth() {
return dateOfBirth;
}
public void setDateOfBirth(LocalDate dateOfBirth) {
this.dateOfBirth = dateOfBirth;
}
public String getIdentityNumber() {
return identityNumber;
}
public void setIdentityNumber(String identityNumber) {
this.identityNumber = identityNumber;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UserEntity that = (UserEntity) o;
return Objects.equals(id, that.id) && Objects.equals(firstName, that.firstName) && Objects.equals(lastName, that.lastName) && Objects.equals(dateOfBirth, that.dateOfBirth) && Objects.equals(identityNumber, that.identityNumber);
}
@Override
public int hashCode() {
return Objects.hash(id, firstName, lastName, dateOfBirth, identityNumber);
}
}

View File

@@ -1,7 +0,0 @@
package com.github.shazin.cqrs.ms.user.command.handler;
import com.github.shazin.cqrs.ms.user.dto.UserEvent;
public interface UserEventHandler {
public boolean publishUserEvent(UserEvent event);
}

View File

@@ -1,47 +0,0 @@
package com.github.shazin.cqrs.ms.user.command.handler.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.shazin.cqrs.ms.user.command.entity.UserCommand;
import com.github.shazin.cqrs.ms.user.command.repo.UserCommandRepository;
import com.github.shazin.cqrs.ms.user.command.handler.UserEventHandler;
import com.github.shazin.cqrs.ms.user.dto.UserEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class DefaultUserEventHandler implements UserEventHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultUserEventHandler.class);
private final KafkaTemplate<String, String> kafkaTemplate;
private final String kafkaTopicName;
private final ObjectMapper objectMapper;
private final UserCommandRepository userCommandRepository;
public DefaultUserEventHandler(KafkaTemplate<String, String> kafkaTemplate, @Value("${kafka.topic.name}") String kafkaTopicName, ObjectMapper objectMapper, UserCommandRepository userCommandRepository) {
this.kafkaTemplate = kafkaTemplate;
this.kafkaTopicName = kafkaTopicName;
this.objectMapper = objectMapper;
this.userCommandRepository = userCommandRepository;
}
public boolean publishUserEvent(UserEvent event) {
try {
String payload = objectMapper.writeValueAsString(event);
this.kafkaTemplate.send(kafkaTopicName, payload);
this.userCommandRepository.save(new UserCommand(UUID.randomUUID().toString(), payload, event.createdDate(), event.createdBy(), event.getType()));
return true;
} catch (Exception e) {
LOGGER.error("Error while publishing user", e);
return false;
}
}
}

View File

@@ -1,8 +0,0 @@
package com.github.shazin.cqrs.ms.user.command.repo;
import com.github.shazin.cqrs.ms.user.command.entity.UserCommand;
import org.springframework.data.jpa.repository.JpaRepository;
public interface UserCommandRepository extends JpaRepository<UserCommand, String> {
}

View File

@@ -0,0 +1,7 @@
package com.github.shazin.cqrs.ms.user.command.repo;
import com.github.shazin.cqrs.ms.user.command.entity.UserEntity;
import org.springframework.data.jpa.repository.JpaRepository;
public interface UserRepository extends JpaRepository<UserEntity, String> {
}

View File

@@ -0,0 +1,11 @@
package com.github.shazin.cqrs.ms.user.command.service;
import com.github.shazin.cqrs.ms.user.command.entity.UserEntity;
public interface UserService {
public UserEntity createUser(UserEntity user);
public boolean deleteUserById(String id);
}

View File

@@ -0,0 +1,27 @@
package com.github.shazin.cqrs.ms.user.command.service.impl;
import com.github.shazin.cqrs.ms.user.command.entity.UserEntity;
import com.github.shazin.cqrs.ms.user.command.repo.UserRepository;
import com.github.shazin.cqrs.ms.user.command.service.UserService;
import org.springframework.stereotype.Service;
@Service
public class DefaultUserService implements UserService {
private final UserRepository userRepository;
public DefaultUserService(UserRepository userRepository) {
this.userRepository = userRepository;
}
@Override
public UserEntity createUser(UserEntity user) {
return userRepository.save(user);
}
@Override
public boolean deleteUserById(String id) {
userRepository.deleteById(id);
return true;
}
}

View File

@@ -1,18 +1,16 @@
package com.github.shazin.cqrs.ms.user.query.listener;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.shazin.cqrs.ms.user.dto.User;
import com.github.shazin.cqrs.ms.user.dto.UserCreateEvent;
import com.github.shazin.cqrs.ms.user.dto.UserDeleteEvent;
import com.github.shazin.cqrs.ms.user.query.entity.UserEntity;
import com.github.shazin.cqrs.ms.user.query.repo.UserRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.UUID;
@Component
public class UserEventListener {
@@ -28,19 +26,23 @@ public class UserEventListener {
}
@KafkaListener(topics = "${kafka.topic.name}")
public void consume(String message) {
try {
Map<String, String> userEvent = objectMapper.readValue(message, Map.class);
if (userEvent.get("type").equals("CREATE")) {
UserCreateEvent userCreateEvent = objectMapper.readValue(message, UserCreateEvent.class);
User user = userCreateEvent.user();
userRepository.save(new UserEntity(UUID.randomUUID().toString(), user.firstName(), user.lastName(), user.dateOfBirth(), user.identityNumber()));
} else if (userEvent.get("type").equals("DELETE")) {
UserDeleteEvent userDeleteEvent = objectMapper.readValue(message, UserDeleteEvent.class);
userRepository.deleteById(userDeleteEvent.id());
public void consume(@Payload(required = false) String message) {
if (StringUtils.hasText(message)) {
try {
Map<String, Object> event = objectMapper.readValue(message, Map.class);
Map<String, Object> payload = (Map<String, Object>) event.get("payload");
if (payload.get("before") == null && payload.get("after") != null) {
LOGGER.info("Create/Update Event");
UserEntity userToBeCreated = objectMapper.convertValue(payload.get("after"), UserEntity.class);
userRepository.save(userToBeCreated);
} else if (payload.get("before") != null && payload.get("after") == null) {
LOGGER.info("Delete Event");
UserEntity userToBeDeleted = objectMapper.convertValue(payload.get("before"), UserEntity.class);
userRepository.delete(userToBeDeleted);
}
} catch (Exception e) {
LOGGER.error("Error while handling message", e);
}
} catch (Exception e) {
LOGGER.error("Error while handling message", e);
}
}
}

View File

@@ -42,14 +42,16 @@ services:
- "8081:8081"
environment:
- SERVER_PORT=8081
- KAFKA_TOPIC_NAME=com.github.shazin.cqrs.ms.users.json
- SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.hsqldb.jdbc.JDBCDriver
- SPRING_DATASOURCE_URL=jdbc:hsqldb:mem:testdb;DB_CLOSE_DELAY=-1
- SPRING_DATASOURCE_USERNAME=sa
- SPRING_DATASOURCE_PASSWORD=
- SPRING_DATASOURCE_DRIVER_CLASS_NAME=com.mysql.cj.jdbc.Driver
- SPRING_DATASOURCE_URL=jdbc:mysql://db/userrdb?autoReconnect=true&useSSL=false&createDatabaseIfNotExist=true
- SPRING_DATASOURCE_USERNAME=root
- SPRING_DATASOURCE_PASSWORD=root
- SPRING_JPA_HIBERNATE_DDL_AUTO=create
- HIBERNATE_DIALECT=org.hibernate.dialect.MySQL8Dialect
depends_on:
- kafka
- db
- debezium
networks:
- cqrs-network
cqrs-ms-user-query-service:
@@ -58,9 +60,9 @@ services:
ports:
- '8080:8080'
environment:
- KAFKA_TOPIC_NAME=com.github.shazin.cqrs.ms.users.json
- KAFKA_TOPIC_NAME=dbserver1.userrdb.cqrs_user
- SPRING_DATASOURCE_DRIVER_CLASS_NAME=com.mysql.cj.jdbc.Driver
- SPRING_DATASOURCE_URL=jdbc:mysql://db/userdb?autoReconnect=true&useSSL=false&createDatabaseIfNotExist=true
- SPRING_DATASOURCE_URL=jdbc:mysql://db/userwdb?autoReconnect=true&useSSL=false&createDatabaseIfNotExist=true
- SPRING_DATASOURCE_USERNAME=root
- SPRING_DATASOURCE_PASSWORD=root
- SPRING_JPA_HIBERNATE_DDL_AUTO=create
@@ -68,5 +70,20 @@ services:
depends_on:
- kafka
- db
- debezium
networks:
- cqrs-network
debezium:
image: debezium/connect:0.10
ports:
- '8083:8083'
environment:
- GROUP_ID=debz_1
- BOOTSTRAP_SERVERS=kafka:9092
- CONFIG_STORAGE_TOPIC=cqrs_connect_configs
- OFFSET_STORAGE_TOPIC=cqrs_connect_offsets
- STATUS_STORAGE_TOPIC=cqrs_connect_statuses
depends_on:
- kafka
networks:
- cqrs-network