From 8b1a2196d39935fb121341c0f9cb11eb9aa24f37 Mon Sep 17 00:00:00 2001 From: Michal Zeman Date: Fri, 15 Nov 2019 17:53:18 +0100 Subject: [PATCH] small refactoring, moving domain events and commands into the separates models --- .../components/ApplicationMessageBus.java | 16 ++++++++++++++ .../impl/ApplicationMessageBusImpl.java | 22 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/common-components/src/main/java/com/mz/reactor/ddd/common/components/ApplicationMessageBus.java b/common-components/src/main/java/com/mz/reactor/ddd/common/components/ApplicationMessageBus.java index 4c6286c..e41a9ee 100644 --- a/common-components/src/main/java/com/mz/reactor/ddd/common/components/ApplicationMessageBus.java +++ b/common-components/src/main/java/com/mz/reactor/ddd/common/components/ApplicationMessageBus.java @@ -1,4 +1,20 @@ package com.mz.reactor.ddd.common.components; +import com.mz.reactor.ddd.common.api.Message; +import reactor.core.publisher.Flux; + public interface ApplicationMessageBus { + + /** + * Publish message into the message stream + * @param message {@link Message} it could be event, command ... + * @param generic type + */ + void publishMessage(M message); + + /** + * Messages stream where is possible to subscribe and consume some king of messages + * @return {@link Flux} + */ + Flux messagesStream(); } diff --git a/common-components/src/main/java/com/mz/reactor/ddd/common/components/impl/ApplicationMessageBusImpl.java b/common-components/src/main/java/com/mz/reactor/ddd/common/components/impl/ApplicationMessageBusImpl.java index a3c0b31..23c3cd6 100644 --- a/common-components/src/main/java/com/mz/reactor/ddd/common/components/impl/ApplicationMessageBusImpl.java +++ b/common-components/src/main/java/com/mz/reactor/ddd/common/components/impl/ApplicationMessageBusImpl.java @@ -1,8 +1,30 @@ package com.mz.reactor.ddd.common.components.impl; +import com.mz.reactor.ddd.common.api.Message; import com.mz.reactor.ddd.common.components.ApplicationMessageBus; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.ReplayProcessor; +import reactor.core.scheduler.Schedulers; + +import java.util.Optional; @Component public class ApplicationMessageBusImpl implements ApplicationMessageBus { + + private final ReplayProcessor messages = ReplayProcessor.create(1); + + private final FluxSink messagesSink = messages.sink(); + + @Override + public void publishMessage(M message) { + Optional.ofNullable(message).ifPresent(messagesSink::next); + } + + @Override + public Flux messagesStream() { + return messages.publishOn(Schedulers.parallel()); + } + }