From f27090094a1224a762aac2d29807625eb32b1908 Mon Sep 17 00:00:00 2001 From: developer Date: Sun, 3 Nov 2019 12:16:20 +0100 Subject: [PATCH] Initial commit Spring query --- spring-boot/query/pom.xml | 121 +++++++++++ .../example/spring/query/ErrorResponse.java | 70 +++++++ .../spring/query/GlobalExceptionHandler.java | 47 +++++ .../cqrs4j/example/spring/query/Person.java | 100 +++++++++ .../spring/query/PersonController.java | 58 ++++++ .../query/PersonCreatedEventHandler.java | 39 ++++ .../spring/query/PersonEventChunkHandler.java | 55 +++++ .../spring/query/PersonEventDispatcher.java | 54 +++++ .../example/spring/query/PersonProjector.java | 92 +++++++++ .../spring/query/PersonRepository.java | 24 +++ .../example/spring/query/QryApplication.java | 72 +++++++ .../example/spring/query/QryConfig.java | 128 ++++++++++++ .../spring/query/QryProjectionPosition.java | 88 ++++++++ .../spring/query/QryProjectionService.java | 55 +++++ .../query/ResourceNotFoundException.java | 23 +++ .../src/main/resources/application.properties | 7 + ...s4jSpringQueryExampleApplicationTests.java | 13 ++ spring-boot/shared/pom.xml | 195 +++++++++--------- .../example/spring/shared/SharedUtils.java | 24 ++- 19 files changed, 1167 insertions(+), 98 deletions(-) create mode 100644 spring-boot/query/pom.xml create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/ErrorResponse.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/GlobalExceptionHandler.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/Person.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonController.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonCreatedEventHandler.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonEventChunkHandler.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonEventDispatcher.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonProjector.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonRepository.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryApplication.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryConfig.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryProjectionPosition.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryProjectionService.java create mode 100644 spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/ResourceNotFoundException.java create mode 100644 spring-boot/query/src/main/resources/application.properties create mode 100644 spring-boot/query/src/test/java/org/fuin/cqrs4j/example/spring/query/Cqrs4jSpringQueryExampleApplicationTests.java diff --git a/spring-boot/query/pom.xml b/spring-boot/query/pom.xml new file mode 100644 index 0000000..adeacba --- /dev/null +++ b/spring-boot/query/pom.xml @@ -0,0 +1,121 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.2.0.RELEASE + + + + org.fuin.cqrs4j.example.spring + cqrs4j-spring-example-query + 0.1.0-SNAPSHOT + cqrs4j-spring-example-query + Spring Boot CQRS Query Demo Application + + + UTF-8 + 1.8 + 1.8 + 1.8 + 0.3.1-SNAPSHOT + + + + + + + + org.fuin.cqrs4j.example.spring + cqrs4j-spring-example-shared + 0.1.0-SNAPSHOT + + + + org.springframework.boot + spring-boot-starter-actuator + + + + org.springframework.boot + spring-boot-starter-data-jpa + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-data-rest + + + + org.fuin + cqrs-4-java + 0.2.1-SNAPSHOT + + + + org.fuin.esc + esc-spi + ${esc.version} + + + + org.fuin.esc + esc-esjc + ${esc.version} + + + + org.fuin.esc + esc-eshttp + ${esc.version} + + + + + + org.springframework.boot + spring-boot-devtools + runtime + true + + + + com.h2database + h2 + runtime + + + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/ErrorResponse.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/ErrorResponse.java new file mode 100644 index 0000000..28171d0 --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/ErrorResponse.java @@ -0,0 +1,70 @@ +package org.fuin.cqrs4j.example.spring.query; + +import java.util.Date; + +/** + * Error message for HTTP response. + */ +public class ErrorResponse { + + private Date timestamp; + + private String status; + + private String message; + + private String details; + + /** + * Constructor with all data. + * + * @param timestamp Timestamp. + * @param status Status. + * @param message Message. + * @param details Details. + */ + public ErrorResponse(final Date timestamp, final String status, final String message, final String details) { + super(); + this.timestamp = timestamp; + this.status = status; + this.message = message; + this.details = details; + } + + /** + * Returns the timestamp. + * + * @return Timestamp + */ + public Date getTimestamp() { + return timestamp; + } + + /** + * Returns the status. + * + * @return Status. + */ + public String getStatus() { + return status; + } + + /** + * Returns the message. + * + * @return Message. + */ + public String getMessage() { + return message; + } + + /** + * Returns the details. + * + * @return Details. + */ + public String getDetails() { + return details; + } + +} diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/GlobalExceptionHandler.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/GlobalExceptionHandler.java new file mode 100644 index 0000000..4b8da44 --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/GlobalExceptionHandler.java @@ -0,0 +1,47 @@ +package org.fuin.cqrs4j.example.spring.query; + +import java.util.Date; + +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.ControllerAdvice; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.context.request.WebRequest; + +/** + * Maps exceptions to a HTTP status code. + */ +@ControllerAdvice +public class GlobalExceptionHandler { + + /** + * Converts a {@link ResourceNotFoundException} to a HTTP response. + * + * @param ex Exception to convert. + * @param request Current request. + * + * @return Response. + */ + @ExceptionHandler(ResourceNotFoundException.class) + public ResponseEntity resourceNotFoundException(final ResourceNotFoundException ex, final WebRequest request) { + final ErrorResponse errorResponse = new ErrorResponse(new Date(), HttpStatus.NOT_FOUND.toString(), + ex.getMessage(), request.getDescription(false)); + return new ResponseEntity<>(errorResponse, HttpStatus.NOT_FOUND); + } + + /** + * Converts a general {@link Exception} to a HTTP response. + * + * @param ex Exception to convert. + * @param request Current request. + * + * @return Response. + */ + @ExceptionHandler(Exception.class) + public ResponseEntity globleExcpetionHandler(final Exception ex, final WebRequest request) { + final ErrorResponse errorResponse = new ErrorResponse(new Date(), HttpStatus.INTERNAL_SERVER_ERROR.toString(), + ex.getMessage(), request.getDescription(false)); + return new ResponseEntity<>(errorResponse, HttpStatus.INTERNAL_SERVER_ERROR); + } + +} diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/Person.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/Person.java new file mode 100644 index 0000000..44fe06d --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/Person.java @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2015 Michael Schnell. All rights reserved. http://www.fuin.org/ + * + * This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License along with this library. If not, see + * http://www.gnu.org/licenses/. + */ +package org.fuin.cqrs4j.example.spring.query; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; +import javax.validation.constraints.NotNull; + +import org.fuin.cqrs4j.example.spring.shared.PersonId; +import org.fuin.cqrs4j.example.spring.shared.PersonName; +import org.fuin.objects4j.common.Contract; + +/** + * Represents a person that will be stored in the database. + */ +@Entity +@Table(name = "QRY_PERSON") +public class Person { + + @Id + @Column(name = "ID", nullable = false, length = 36, updatable = false) + @NotNull + private String id; + + @Column(name = "NAME", nullable = false, length = PersonName.MAX_LENGTH, updatable = true) + @NotNull + private String name; + + /** + * Deserialization constructor. + */ + protected Person() { + super(); + } + + /** + * Constructor with all data. + * + * @param id + * Unique aggregate identifier. + * @param name + * Name of the created person + */ + public Person(@NotNull final PersonId id, @NotNull final PersonName name) { + super(); + Contract.requireArgNotNull("id", id); + Contract.requireArgNotNull("name", name); + this.id = id.asString(); + this.name = name.asString(); + } + + /** + * Returns the unique person identifier. + * + * @return Aggregate ID. + */ + @NotNull + public PersonId getId() { + return PersonId.valueOf(id); + } + + /** + * Returns the name of the person to create. + * + * @return the Person name + */ + @NotNull + public PersonName getName() { + return new PersonName(name); + } + + /** + * Sets the name of the person. + * + * @param name + * Name to set. + */ + public void setName(@NotNull final PersonName name) { + Contract.requireArgNotNull("name", name); + this.name = name.asString(); + } + + @Override + public String toString() { + return "QryPerson [id=" + id + ", name=" + name + "]"; + } + +} diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonController.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonController.java new file mode 100644 index 0000000..02ba47c --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonController.java @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2015 Michael Schnell. All rights reserved. http://www.fuin.org/ + * + * This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License along with this library. If not, see + * http://www.gnu.org/licenses/. + */ +package org.fuin.cqrs4j.example.spring.query; + +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api/v1") +public class PersonController { + + @Autowired + private PersonRepository personRepository; + + /** + * Get all persons list. + * + * @return the list + */ + @GetMapping("/persons") + public List getAllQueryPersons() { + return personRepository.findAll(); + } + + /** + * Reads a person by it's universally unique aggregate UUID. + * + * @param personId Person UUID. + * + * @return Person from database. + * + * @throws ResourceNotFoundException A person with the given UUID is unknown. + */ + @GetMapping("/persons/{id}") + public ResponseEntity getQueryPersonsById(@PathVariable(value = "id") String personId) + throws ResourceNotFoundException { + Person person = personRepository.findById(personId).orElseThrow( + () -> new ResourceNotFoundException("A person with id '" + personId + "' was not found")); + return ResponseEntity.ok().body(person); + } + +} diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonCreatedEventHandler.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonCreatedEventHandler.java new file mode 100644 index 0000000..a6e102d --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonCreatedEventHandler.java @@ -0,0 +1,39 @@ +package org.fuin.cqrs4j.example.spring.query; + +import org.fuin.cqrs4j.EventHandler; +import org.fuin.cqrs4j.example.spring.shared.PersonCreatedEvent; +import org.fuin.cqrs4j.example.spring.shared.PersonId; +import org.fuin.ddd4j.ddd.EventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +/** + * Handles the {@link PersonCreatedEvent}. + */ +@Component +public class PersonCreatedEventHandler implements EventHandler { + + private static final Logger LOG = LoggerFactory.getLogger(PersonCreatedEventHandler.class); + + @Autowired + private PersonRepository repo; + + @Override + public EventType getEventType() { + return PersonCreatedEvent.TYPE; + } + + @Override + @Transactional + public void handle(final PersonCreatedEvent event) { + LOG.info("Handle " + event); + final PersonId personId = event.getEntityId(); + if (repo.findById(personId.asString()) == null) { + repo.save(new Person(personId, event.getName())); + } + } + +} \ No newline at end of file diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonEventChunkHandler.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonEventChunkHandler.java new file mode 100644 index 0000000..eb01755 --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonEventChunkHandler.java @@ -0,0 +1,55 @@ +package org.fuin.cqrs4j.example.spring.query; + +import javax.annotation.concurrent.NotThreadSafe; + +import org.fuin.cqrs4j.ProjectionService; +import org.fuin.esc.api.ProjectionStreamId; +import org.fuin.esc.api.StreamEventsSlice; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +/** + * Dispatches the events to the event handlers that will update the database and + * stores the next event position in the database (All in the same transaction). + */ +@NotThreadSafe +@Component +public class PersonEventChunkHandler { + + private static final Logger LOG = LoggerFactory.getLogger(PersonEventChunkHandler.class); + + /** Unique name of the event store projection that is used. */ + public static final ProjectionStreamId PROJECTION_STREAM_ID = new ProjectionStreamId("qry-person-stream"); + + @Autowired + private PersonEventDispatcher dispatcher; + + @Autowired + private ProjectionService projectionService; + + /** + * Returns the next event position to read. + * + * @return Number of the next event to read. + */ + @Transactional(readOnly = true) + public Long readNextEventNumber() { + return projectionService.readProjectionPosition(PROJECTION_STREAM_ID); + } + + /** + * Handles the current slice as a single transaction. + * + * @param currentSlice Slice with events to dispatch. + */ + @Transactional + public void handleChunk(final StreamEventsSlice currentSlice) { + LOG.debug("Handle chunk: {}", currentSlice); + dispatcher.dispatchCommonEvents(currentSlice.getEvents()); + projectionService.updateProjectionPosition(PROJECTION_STREAM_ID, currentSlice.getNextEventNumber()); + } + +} \ No newline at end of file diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonEventDispatcher.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonEventDispatcher.java new file mode 100644 index 0000000..4b9a135 --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonEventDispatcher.java @@ -0,0 +1,54 @@ +package org.fuin.cqrs4j.example.spring.query; + +import java.util.List; +import java.util.Set; + +import javax.annotation.concurrent.NotThreadSafe; +import javax.validation.constraints.NotNull; + +import org.fuin.cqrs4j.EventDispatcher; +import org.fuin.cqrs4j.SimpleEventDispatcher; +import org.fuin.ddd4j.ddd.Event; +import org.fuin.ddd4j.ddd.EventType; +import org.fuin.esc.api.CommonEvent; +import org.springframework.stereotype.Component; + +/** + * Dispatches events that relate to the {@link Person} entity to the appropriate + * event handers. + */ +@NotThreadSafe +@Component +public class PersonEventDispatcher implements EventDispatcher { + + private SimpleEventDispatcher delegate; + + /** + * Default constructor. + */ + public PersonEventDispatcher(final PersonCreatedEventHandler createdHandler) { + super(); + delegate = new SimpleEventDispatcher(createdHandler); + } + + @Override + public @NotNull Set getAllTypes() { + return delegate.getAllTypes(); + } + + @Override + public void dispatchCommonEvents(@NotNull List commonEvents) { + delegate.dispatchCommonEvents(commonEvents); + } + + @Override + public void dispatchEvents(@NotNull List events) { + delegate.dispatchEvents(events); + } + + @Override + public void dispatchEvent(@NotNull Event event) { + delegate.dispatchEvent(event); + } + +} diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonProjector.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonProjector.java new file mode 100644 index 0000000..2e38710 --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonProjector.java @@ -0,0 +1,92 @@ +package org.fuin.cqrs4j.example.spring.query; + +import static org.fuin.cqrs4j.Cqrs4JUtils.tryLocked; +import static org.fuin.cqrs4j.example.spring.query.PersonEventChunkHandler.PROJECTION_STREAM_ID; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Semaphore; + +import javax.annotation.concurrent.ThreadSafe; + +import org.fuin.ddd4j.ddd.EventType; +import org.fuin.esc.api.EventStore; +import org.fuin.esc.api.ProjectionAdminEventStore; +import org.fuin.esc.api.TypeName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +/** + * Reads incoming events from an event store projection and dispatches them to + * the appropriate event handlers. The event store projection will be created by + * this class, if it does not yet exist. + */ +@ThreadSafe +@Component +public class PersonProjector { + + private static final Logger LOG = LoggerFactory.getLogger(PersonProjector.class); + + /** Prevents more than one projector thread running at a time. */ + private static final Semaphore LOCK = new Semaphore(1); + + // The following beans are NOT thread safe! + // Above LOCK prevents multithreaded access + + @Autowired + private ProjectionAdminEventStore eventstore; + + @Autowired + private EventStore eventStore; + + @Autowired + private PersonEventChunkHandler chunkHandler; + + @Autowired + private PersonEventDispatcher dispatcher; + + /** + * Runs triggered by the timer. If a second timer event occurs while the + * previous call is still being executed, the method execution will simply be + * skipped. + */ + @Scheduled(fixedRate = 100) + @Async("personProjectorExecutor") + public void execute() { + tryLocked(LOCK, () -> { + try { + readStreamEvents(); + } catch (final RuntimeException ex) { + LOG.error("Error reading events from stream", ex); + } + }); + } + + private void readStreamEvents() { + + // TODO Make sure a projection with the correct events exists + // We must update the projection if new events are defined or some are removed! + if (!eventstore.projectionExists(PROJECTION_STREAM_ID)) { + final Set eventTypes = dispatcher.getAllTypes(); + final List typeNames = new ArrayList<>(); + for (final EventType eventType : eventTypes) { + typeNames.add(new TypeName(eventType.asBaseType())); + } + LOG.info("Create projection '{}' with events: {}", PROJECTION_STREAM_ID, typeNames); + eventstore.createProjection(PROJECTION_STREAM_ID, true, typeNames); + } + + // Read and dispatch events + final Long nextEventNumber = chunkHandler.readNextEventNumber(); + eventStore.readAllEventsForward(PROJECTION_STREAM_ID, nextEventNumber, 100, (currentSlice) -> { + chunkHandler.handleChunk(currentSlice); + }); + + } + +} diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonRepository.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonRepository.java new file mode 100644 index 0000000..ec469f0 --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/PersonRepository.java @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2015 Michael Schnell. All rights reserved. http://www.fuin.org/ + * + * This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License along with this library. If not, see + * http://www.gnu.org/licenses/. + */ +package org.fuin.cqrs4j.example.spring.query; + +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +/** + * Database service for persons. + */ +@Repository +public interface PersonRepository extends JpaRepository { + +} \ No newline at end of file diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryApplication.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryApplication.java new file mode 100644 index 0000000..195f7d9 --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryApplication.java @@ -0,0 +1,72 @@ +package org.fuin.cqrs4j.example.spring.query; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.fuin.cqrs4j.example.spring.shared.SharedUtils; +import org.fuin.esc.eshttp.ESEnvelopeType; +import org.fuin.esc.eshttp.ESHttpEventStore; +import org.fuin.esc.eshttp.IESHttpEventStore; +import org.fuin.esc.spi.SerDeserializerRegistry; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@SpringBootApplication(scanBasePackages = "org.fuin.cqrs4j.example.spring.query") +@EnableScheduling +@EnableAsync +public class QryApplication { + + /** + * Creates a HTTP based event store connection. + * + * @param config Configuration to use. + * + * @return New event store instance. + */ + @Bean(destroyMethod = "close") + public IESHttpEventStore getESHttpEventStore(final QryConfig config) { + final String url = config.getEventStoreProtocol() + "://" + config.getEventStoreHost() + ":" + + config.getEventStoreHttpPort(); + try { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + final UsernamePasswordCredentials credentials = new UsernamePasswordCredentials(config.getEventStoreUser(), + config.getEventStorePassword()); + credentialsProvider.setCredentials(AuthScope.ANY, credentials); + final SerDeserializerRegistry registry = SharedUtils.createRegistry(); + final ESHttpEventStore es = new ESHttpEventStore(Executors.defaultThreadFactory(), new URL(url), + ESEnvelopeType.JSON, registry, registry, credentialsProvider); + es.open(); + return es; + } catch (final MalformedURLException ex) { + throw new RuntimeException("Failed to create URL: " + url, ex); + } + } + + + @Bean("personProjectorExecutor") + public Executor taskExecutor() { + final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(1); + executor.setMaxPoolSize(5); + executor.setQueueCapacity(500); + executor.setThreadNamePrefix("person-"); + executor.initialize(); + return executor; + } + + + public static void main(String[] args) { + SpringApplication.run(QryApplication.class, args); + } + +} diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryConfig.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryConfig.java new file mode 100644 index 0000000..4317ae4 --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryConfig.java @@ -0,0 +1,128 @@ +package org.fuin.cqrs4j.example.spring.query; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +public class QryConfig { + + private static final String EVENT_STORE_PROTOCOL = "http"; + + private static final String EVENT_STORE_HOST = "localhost"; + + private static final int EVENT_STORE_HTTP_PORT = 2113; + + private static final int EVENT_STORE_TCP_PORT = 1113; + + private static final String EVENT_STORE_USER = "admin"; + + private static final String EVENT_STORE_PASSWORD = "changeit"; + + @Value("${EVENT_STORE_PROTOCOL:http}") + private String eventStoreProtocol; + + @Value("${EVENT_STORE_HOST:localhost}") + private String eventStoreHost; + + @Value("${EVENT_STORE_HTTP_PORT:2113}") + private int eventStoreHttpPort; + + @Value("${EVENT_STORE_TCP_PORT:1113}") + private int eventStoreTcpPort; + + @Value("${EVENT_STORE_USER:admin}") + private String eventStoreUser; + + @Value("${EVENT_STORE_PASSWORD:changeit}") + private String eventStorePassword; + + /** + * Constructor using default values internally. + */ + public QryConfig() { + super(); + this.eventStoreProtocol = EVENT_STORE_PROTOCOL; + this.eventStoreHost = EVENT_STORE_HOST; + this.eventStoreHttpPort = EVENT_STORE_HTTP_PORT; + this.eventStoreTcpPort = EVENT_STORE_TCP_PORT; + this.eventStoreUser = EVENT_STORE_USER; + this.eventStorePassword = EVENT_STORE_PASSWORD; + } + + /** + * Constructor with all data. + * + * @param eventStoreProtocol Protocol. + * @param eventStoreHost Host. + * @param eventStoreHttpPort HTTP port + * @param eventStoreTcpPort TCP port. + * @param eventStoreUser User. + * @param eventStorePassword Password. + */ + public QryConfig(final String eventStoreProtocol, final String eventStoreHost, final int eventStoreHttpPort, final int eventStoreTcpPort, + final String eventStoreUser, final String eventStorePassword) { + super(); + this.eventStoreProtocol = eventStoreProtocol; + this.eventStoreHost = eventStoreHost; + this.eventStoreHttpPort = eventStoreHttpPort; + this.eventStoreTcpPort = eventStoreTcpPort; + this.eventStoreUser = eventStoreUser; + this.eventStorePassword = eventStorePassword; + } + + /** + * Returns the protocol of the event store. + * + * @return Either http or https. + */ + public String getEventStoreProtocol() { + return eventStoreProtocol; + } + + /** + * Returns the host name of the event store. + * + * @return Name. + */ + public String getEventStoreHost() { + return eventStoreHost; + } + + /** + * Returns the HTTP port of the event store. + * + * @return Port. + */ + public int getEventStoreHttpPort() { + return eventStoreHttpPort; + } + + /** + * Returns the TCP port of the event store. + * + * @return Port. + */ + public int getEventStoreTcpPort() { + return eventStoreTcpPort; + } + + /** + * Returns the username of the event store. + * + * @return Username. + */ + public String getEventStoreUser() { + return eventStoreUser; + } + + /** + * Returns the password of the event store. + * + * @return Password. + */ + public String getEventStorePassword() { + return eventStorePassword; + } + +} + diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryProjectionPosition.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryProjectionPosition.java new file mode 100644 index 0000000..4a28b95 --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryProjectionPosition.java @@ -0,0 +1,88 @@ +package org.fuin.cqrs4j.example.spring.query; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Table; +import javax.validation.constraints.NotNull; + +import org.fuin.esc.api.SimpleStreamId; +import org.fuin.esc.api.StreamId; +import org.fuin.objects4j.common.Contract; + +/** + * Stores the next position to read from a projection in the event store. + */ +@Entity +@Table(name = "QRY_PROJECTION_POS") +public class QryProjectionPosition { + + @Id + @Column(name = "STREAM_ID", nullable = false, length = 250, updatable = false) + @NotNull + private String streamId; + + @Column(name = "NEXT_POS", nullable = false, updatable = true) + @NotNull + private Long nextPos; + + /** + * JPA constructor. + */ + protected QryProjectionPosition() { + super(); + } + + /** + * Constructor with mandatory data. + * + * @param streamId + * Unique stream identifier. + * @param nextPos + * Next position from the stream to read. + */ + public QryProjectionPosition(@NotNull final StreamId streamId, @NotNull final Long nextPos) { + super(); + Contract.requireArgNotNull("streamId", streamId); + Contract.requireArgNotNull("nextPos", nextPos); + this.streamId = streamId.asString(); + this.nextPos = nextPos; + } + + /** + * Returns the unique stream identifier. + * + * @return Stream ID. + */ + @NotNull + public StreamId getStreamId() { + return new SimpleStreamId(streamId); + } + + /** + * Returns the next position read from the stream. + * + * @return Position to read next time. + */ + @NotNull + public Long getNextPos() { + return nextPos; + } + + /** + * Sets the next position read from the stream. + * + * @param nextPos + * New position to set. + */ + public void setNextPosition(@NotNull final Long nextPos) { + Contract.requireArgNotNull("nextPos", nextPos); + this.nextPos = nextPos; + } + + @Override + public String toString() { + return "QryProjectionPosition [streamId=" + streamId + ", nextPos=" + nextPos + "]"; + } + +} diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryProjectionService.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryProjectionService.java new file mode 100644 index 0000000..797a517 --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/QryProjectionService.java @@ -0,0 +1,55 @@ +package org.fuin.cqrs4j.example.spring.query; + +import javax.persistence.EntityManager; +import javax.persistence.PersistenceContext; +import javax.validation.constraints.NotNull; + +import org.fuin.cqrs4j.ProjectionService; +import org.fuin.esc.api.StreamId; +import org.fuin.objects4j.common.Contract; +import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; + +/** + * Service to read and persist the next position of a stream to read. + */ +@Repository +public class QryProjectionService implements ProjectionService { + + @PersistenceContext + private EntityManager em; + + @Override + public void resetProjectionPosition(@NotNull final StreamId streamId) { + Contract.requireArgNotNull("streamId", streamId); + final QryProjectionPosition pos = em.find(QryProjectionPosition.class, streamId.asString()); + if (pos != null) { + pos.setNextPosition(0L); + } + } + + @Override + @Transactional(readOnly = true) + public Long readProjectionPosition(@NotNull StreamId streamId) { + Contract.requireArgNotNull("streamId", streamId); + final QryProjectionPosition pos = em.find(QryProjectionPosition.class, streamId.asString()); + if (pos == null) { + return 0L; + } + return pos.getNextPos(); + } + + @Override + public void updateProjectionPosition(@NotNull StreamId streamId, @NotNull Long nextEventNumber) { + Contract.requireArgNotNull("streamId", streamId); + Contract.requireArgNotNull("nextEventNumber", nextEventNumber); + QryProjectionPosition pos = em.find(QryProjectionPosition.class, streamId.asString()); + if (pos == null) { + pos = new QryProjectionPosition(streamId, nextEventNumber); + em.persist(pos); + } else { + pos.setNextPosition(nextEventNumber); + } + } + +} diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/ResourceNotFoundException.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/ResourceNotFoundException.java new file mode 100644 index 0000000..0c8cabd --- /dev/null +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/ResourceNotFoundException.java @@ -0,0 +1,23 @@ +package org.fuin.cqrs4j.example.spring.query; + +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +/** + * A resource was not found. + */ +@ResponseStatus(value = HttpStatus.NOT_FOUND) +public class ResourceNotFoundException extends Exception { + + private static final long serialVersionUID = 1L; + + /** + * Constructor with message. + * + * @param message Exception message. + */ + public ResourceNotFoundException(final String message) { + super(message); + } + +} \ No newline at end of file diff --git a/spring-boot/query/src/main/resources/application.properties b/spring-boot/query/src/main/resources/application.properties new file mode 100644 index 0000000..0abe86d --- /dev/null +++ b/spring-boot/query/src/main/resources/application.properties @@ -0,0 +1,7 @@ + +spring.datasource.url=jdbc:h2:mem:testdb +spring.datasource.driverClassName=org.h2.Driver +spring.datasource.username=sa +spring.datasource.password=verySecret +spring.jpa.database-platform=org.hibernate.dialect.H2Dialect + diff --git a/spring-boot/query/src/test/java/org/fuin/cqrs4j/example/spring/query/Cqrs4jSpringQueryExampleApplicationTests.java b/spring-boot/query/src/test/java/org/fuin/cqrs4j/example/spring/query/Cqrs4jSpringQueryExampleApplicationTests.java new file mode 100644 index 0000000..3bc0d68 --- /dev/null +++ b/spring-boot/query/src/test/java/org/fuin/cqrs4j/example/spring/query/Cqrs4jSpringQueryExampleApplicationTests.java @@ -0,0 +1,13 @@ +package org.fuin.cqrs4j.example.spring.query; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class Cqrs4jSpringQueryExampleApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/spring-boot/shared/pom.xml b/spring-boot/shared/pom.xml index 3554d00..1a06a25 100644 --- a/spring-boot/shared/pom.xml +++ b/spring-boot/shared/pom.xml @@ -1,5 +1,6 @@ - 4.0.0 @@ -9,119 +10,119 @@ cqrs4j-spring-example-shared Spring Boot CQRS Shared Code for Demo Application - - UTF-8 - 1.8 - 1.8 + + UTF-8 + 1.8 + 1.8 1.8 - 0.3.1-SNAPSHOT - + 0.3.1-SNAPSHOT + - + - + - - org.fuin - ddd-4-java - 0.2.1-SNAPSHOT - + + org.fuin + ddd-4-java + 0.2.1-SNAPSHOT + - - org.fuin.esc - esc-spi - ${esc.version} - + + org.fuin + objects4j + 0.6.9-SNAPSHOT + - - org.fuin.esc - esc-esjc - ${esc.version} - + + org.fuin.esc + esc-spi + ${esc.version} + - - org.slf4j - slf4j-api - 1.7.25 - + + org.fuin.esc + esc-esjc + ${esc.version} + - - org.hibernate.validator - hibernate-validator - 6.0.10.Final - - - jaxb-impl - com.sun.xml.bind - - - jaxb-api - javax.xml.bind - - - + + org.slf4j + slf4j-api + 1.7.25 + - - javax.json - javax.json-api - 1.1.4 - + + org.hibernate.validator + hibernate-validator + 6.0.10.Final + + + jaxb-impl + com.sun.xml.bind + + + jaxb-api + javax.xml.bind + + + - - org.glassfish - javax.json - 1.1.4 - + + org.glassfish + javax.json + 1.1.4 + - - org.eclipse - yasson - 1.0.3 - + + org.eclipse + yasson + 1.0.3 + - - org.hibernate.javax.persistence - hibernate-jpa-2.1-api - 1.0.2 - - - + + jakarta.persistence + jakarta.persistence-api + 2.2.3 + - - junit - junit - 4.12 - test - + - - org.assertj - assertj-core - 3.10.0 - test - + + junit + junit + 4.12 + test + - - org.fuin - units4j - 0.8.3 - test - + + org.assertj + assertj-core + 3.10.0 + test + - - nl.jqno.equalsverifier - equalsverifier - 2.4.6 - test - + + org.fuin + units4j + 0.8.3 + test + - - commons-io - commons-io - 2.6 - test - + + nl.jqno.equalsverifier + equalsverifier + 2.4.6 + test + - + + commons-io + commons-io + 2.6 + test + + + diff --git a/spring-boot/shared/src/main/java/org/fuin/cqrs4j/example/spring/shared/SharedUtils.java b/spring-boot/shared/src/main/java/org/fuin/cqrs4j/example/spring/shared/SharedUtils.java index bb556bf..f40b0e9 100644 --- a/spring-boot/shared/src/main/java/org/fuin/cqrs4j/example/spring/shared/SharedUtils.java +++ b/spring-boot/shared/src/main/java/org/fuin/cqrs4j/example/spring/shared/SharedUtils.java @@ -109,7 +109,29 @@ public final class SharedUtils { return registry; } - /** + /** + * Creates a registry that connects the type with the appropriate serializer and + * de-serializer. + * + * @return New instance. + */ + public static SerDeserializerRegistry createRegistry() { + + // Knows about all types for usage with JSON-B + final SerializedDataTypeRegistry typeRegistry = SharedUtils.createTypeRegistry(); + + // Does the actual marshalling/unmarshalling + final JsonbDeSerializer jsonbDeSer = SharedUtils.createJsonbDeSerializer(); + + // Registry connects the type with the appropriate serializer and de-serializer + final SerDeserializerRegistry serDeserRegistry = SharedUtils.createSerDeserializerRegistry(typeRegistry, + jsonbDeSer); + + return serDeserRegistry; + + } + + /** * Creates an instance of the JSON-B serializer/deserializer. * * @return New instance that is fully initialized with al necessary settings.