From 64e95e8dc8e7f3e19227dcebd2f44aa05096a575 Mon Sep 17 00:00:00 2001 From: Michael Schnell Date: Sat, 21 Mar 2020 11:57:50 +0100 Subject: [PATCH] Create ES projection based on even type names. --- .../PersonListEventChunkHandler.java | 22 +++++++- .../views/personlist/PersonListProjector.java | 50 +++++++++++-------- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/views/personlist/PersonListEventChunkHandler.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/views/personlist/PersonListEventChunkHandler.java index 60a8877..1652a5b 100644 --- a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/views/personlist/PersonListEventChunkHandler.java +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/views/personlist/PersonListEventChunkHandler.java @@ -1,8 +1,12 @@ package org.fuin.cqrs4j.example.spring.query.views.personlist; +import java.util.Set; + import javax.annotation.concurrent.NotThreadSafe; import org.fuin.cqrs4j.ProjectionService; +import org.fuin.cqrs4j.example.shared.SharedUtils; +import org.fuin.ddd4j.ddd.EventType; import org.fuin.esc.api.ProjectionStreamId; import org.fuin.esc.api.StreamEventsSlice; import org.slf4j.Logger; @@ -30,7 +34,23 @@ public class PersonListEventChunkHandler { @Autowired private ProjectionService projectionService; - /** + private ProjectionStreamId streamId; + + /** + * Returns the name of the event store projection that is used by this handler. + * + * @return Unique projection stream name. + */ + public ProjectionStreamId getProjectionStreamId() { + if (streamId == null) { + final Set eventTypes = dispatcher.getAllTypes(); + final String name = "spring-qry-person-" + SharedUtils.calculateChecksum(eventTypes); + streamId = new ProjectionStreamId(name); + } + return streamId; + } + + /** * Returns the next event position to read. * * @return Number of the next event to read. diff --git a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/views/personlist/PersonListProjector.java b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/views/personlist/PersonListProjector.java index 2acfb31..967e008 100644 --- a/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/views/personlist/PersonListProjector.java +++ b/spring-boot/query/src/main/java/org/fuin/cqrs4j/example/spring/query/views/personlist/PersonListProjector.java @@ -1,7 +1,6 @@ package org.fuin.cqrs4j.example.spring.query.views.personlist; import static org.fuin.cqrs4j.Cqrs4JUtils.tryLocked; -import static org.fuin.cqrs4j.example.spring.query.views.personlist.PersonListEventChunkHandler.PROJECTION_STREAM_ID; import java.util.ArrayList; import java.util.List; @@ -44,7 +43,7 @@ public class PersonListProjector { // Above LOCK prevents multithreaded access @Autowired - private IESHttpEventStore eventStore; + private IESHttpEventStore eventstore; @Autowired private PersonListEventChunkHandler chunkHandler; @@ -85,26 +84,37 @@ public class PersonListProjector { LOG.info("Application stopped"); } - private void readStreamEvents() { + private void readStreamEvents() { - // TODO Make sure a projection with the correct events exists - // We must update the projection if new events are defined or some are removed! - if (!eventStore.projectionExists(PROJECTION_STREAM_ID)) { - final Set eventTypes = dispatcher.getAllTypes(); - final List typeNames = new ArrayList<>(); - for (final EventType eventType : eventTypes) { - typeNames.add(new TypeName(eventType.asBaseType())); - } - LOG.info("Create projection '{}' with events: {}", PROJECTION_STREAM_ID, typeNames); - eventStore.createProjection(PROJECTION_STREAM_ID, true, typeNames); - } + // Create a projection if it does not exist. + // Multiple projections (even in different microservice versions) + // might share the same projection based on the event name hash. + if (!eventstore.projectionExists(chunkHandler.getProjectionStreamId())) { + final List typeNames = getEventTypeNames(); + LOG.info("Create projection '{}' with events: {}", chunkHandler.getProjectionStreamId(), typeNames); + eventstore.createProjection(chunkHandler.getProjectionStreamId(), true, typeNames); + } - // Read and dispatch events - final Long nextEventNumber = chunkHandler.readNextEventNumber(); - eventStore.readAllEventsForward(PROJECTION_STREAM_ID, nextEventNumber, 100, (currentSlice) -> { - chunkHandler.handleChunk(currentSlice); - }); + // Read and dispatch events + final Long nextEventNumber = chunkHandler.readNextEventNumber(); + eventstore.readAllEventsForward(chunkHandler.getProjectionStreamId(), nextEventNumber, 100, (currentSlice) -> { + chunkHandler.handleChunk(currentSlice); + }); - } + } + + /** + * Returns a list of all event type names used for this projection. + * + * @return List of event names. + */ + public List getEventTypeNames() { + final List typeNames = new ArrayList<>(); + final Set eventTypes = dispatcher.getAllTypes(); + for (final EventType eventType : eventTypes) { + typeNames.add(new TypeName(eventType.asBaseType())); + } + return typeNames; + } }