Create ES projection based on even type names.

This commit is contained in:
Michael Schnell
2020-03-21 11:57:50 +01:00
parent b4114a97eb
commit 64e95e8dc8
2 changed files with 51 additions and 21 deletions

View File

@@ -1,8 +1,12 @@
package org.fuin.cqrs4j.example.spring.query.views.personlist;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;
import org.fuin.cqrs4j.ProjectionService;
import org.fuin.cqrs4j.example.shared.SharedUtils;
import org.fuin.ddd4j.ddd.EventType;
import org.fuin.esc.api.ProjectionStreamId;
import org.fuin.esc.api.StreamEventsSlice;
import org.slf4j.Logger;
@@ -30,7 +34,23 @@ public class PersonListEventChunkHandler {
@Autowired
private ProjectionService projectionService;
/**
private ProjectionStreamId streamId;
/**
* Returns the name of the event store projection that is used by this handler.
*
* @return Unique projection stream name.
*/
public ProjectionStreamId getProjectionStreamId() {
if (streamId == null) {
final Set<EventType> eventTypes = dispatcher.getAllTypes();
final String name = "spring-qry-person-" + SharedUtils.calculateChecksum(eventTypes);
streamId = new ProjectionStreamId(name);
}
return streamId;
}
/**
* Returns the next event position to read.
*
* @return Number of the next event to read.

View File

@@ -1,7 +1,6 @@
package org.fuin.cqrs4j.example.spring.query.views.personlist;
import static org.fuin.cqrs4j.Cqrs4JUtils.tryLocked;
import static org.fuin.cqrs4j.example.spring.query.views.personlist.PersonListEventChunkHandler.PROJECTION_STREAM_ID;
import java.util.ArrayList;
import java.util.List;
@@ -44,7 +43,7 @@ public class PersonListProjector {
// Above LOCK prevents multithreaded access
@Autowired
private IESHttpEventStore eventStore;
private IESHttpEventStore eventstore;
@Autowired
private PersonListEventChunkHandler chunkHandler;
@@ -85,26 +84,37 @@ public class PersonListProjector {
LOG.info("Application stopped");
}
private void readStreamEvents() {
private void readStreamEvents() {
// TODO Make sure a projection with the correct events exists
// We must update the projection if new events are defined or some are removed!
if (!eventStore.projectionExists(PROJECTION_STREAM_ID)) {
final Set<EventType> eventTypes = dispatcher.getAllTypes();
final List<TypeName> typeNames = new ArrayList<>();
for (final EventType eventType : eventTypes) {
typeNames.add(new TypeName(eventType.asBaseType()));
}
LOG.info("Create projection '{}' with events: {}", PROJECTION_STREAM_ID, typeNames);
eventStore.createProjection(PROJECTION_STREAM_ID, true, typeNames);
}
// Create a projection if it does not exist.
// Multiple projections (even in different microservice versions)
// might share the same projection based on the event name hash.
if (!eventstore.projectionExists(chunkHandler.getProjectionStreamId())) {
final List<TypeName> typeNames = getEventTypeNames();
LOG.info("Create projection '{}' with events: {}", chunkHandler.getProjectionStreamId(), typeNames);
eventstore.createProjection(chunkHandler.getProjectionStreamId(), true, typeNames);
}
// Read and dispatch events
final Long nextEventNumber = chunkHandler.readNextEventNumber();
eventStore.readAllEventsForward(PROJECTION_STREAM_ID, nextEventNumber, 100, (currentSlice) -> {
chunkHandler.handleChunk(currentSlice);
});
// Read and dispatch events
final Long nextEventNumber = chunkHandler.readNextEventNumber();
eventstore.readAllEventsForward(chunkHandler.getProjectionStreamId(), nextEventNumber, 100, (currentSlice) -> {
chunkHandler.handleChunk(currentSlice);
});
}
}
/**
* Returns a list of all event type names used for this projection.
*
* @return List of event names.
*/
public List<TypeName> getEventTypeNames() {
final List<TypeName> typeNames = new ArrayList<>();
final Set<EventType> eventTypes = dispatcher.getAllTypes();
for (final EventType eventType : eventTypes) {
typeNames.add(new TypeName(eventType.asBaseType()));
}
return typeNames;
}
}