diff --git a/common-components/src/main/java/com/mz/reactor/ddd/common/components/ApplicationMessageBus.java b/common-components/src/main/java/com/mz/reactor/ddd/common/components/ApplicationMessageBus.java index 4c6286c..e41a9ee 100644 --- a/common-components/src/main/java/com/mz/reactor/ddd/common/components/ApplicationMessageBus.java +++ b/common-components/src/main/java/com/mz/reactor/ddd/common/components/ApplicationMessageBus.java @@ -1,4 +1,20 @@ package com.mz.reactor.ddd.common.components; +import com.mz.reactor.ddd.common.api.Message; +import reactor.core.publisher.Flux; + public interface ApplicationMessageBus { + + /** + * Publish message into the message stream + * @param message {@link Message} it could be event, command ... + * @param generic type + */ + void publishMessage(M message); + + /** + * Messages stream where is possible to subscribe and consume some king of messages + * @return {@link Flux} + */ + Flux messagesStream(); } diff --git a/common-components/src/main/java/com/mz/reactor/ddd/common/components/impl/ApplicationMessageBusImpl.java b/common-components/src/main/java/com/mz/reactor/ddd/common/components/impl/ApplicationMessageBusImpl.java index a3c0b31..23c3cd6 100644 --- a/common-components/src/main/java/com/mz/reactor/ddd/common/components/impl/ApplicationMessageBusImpl.java +++ b/common-components/src/main/java/com/mz/reactor/ddd/common/components/impl/ApplicationMessageBusImpl.java @@ -1,8 +1,30 @@ package com.mz.reactor.ddd.common.components.impl; +import com.mz.reactor.ddd.common.api.Message; import com.mz.reactor.ddd.common.components.ApplicationMessageBus; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.ReplayProcessor; +import reactor.core.scheduler.Schedulers; + +import java.util.Optional; @Component public class ApplicationMessageBusImpl implements ApplicationMessageBus { + + private final ReplayProcessor messages = ReplayProcessor.create(1); + + private final FluxSink messagesSink = messages.sink(); + + @Override + public void publishMessage(M message) { + Optional.ofNullable(message).ifPresent(messagesSink::next); + } + + @Override + public Flux messagesStream() { + return messages.publishOn(Schedulers.parallel()); + } + }