small refactoring, moving domain events and commands into the separates models

This commit is contained in:
Michal Zeman
2019-11-15 17:53:18 +01:00
parent f368532393
commit 8b1a2196d3
2 changed files with 38 additions and 0 deletions

View File

@@ -1,4 +1,20 @@
package com.mz.reactor.ddd.common.components;
import com.mz.reactor.ddd.common.api.Message;
import reactor.core.publisher.Flux;
public interface ApplicationMessageBus {
/**
* Publish message into the message stream
* @param message {@link Message} it could be event, command ...
* @param <M> generic type
*/
<M extends Message> void publishMessage(M message);
/**
* Messages stream where is possible to subscribe and consume some king of messages
* @return {@link Flux<Message>}
*/
Flux<Message> messagesStream();
}

View File

@@ -1,8 +1,30 @@
package com.mz.reactor.ddd.common.components.impl;
import com.mz.reactor.ddd.common.api.Message;
import com.mz.reactor.ddd.common.components.ApplicationMessageBus;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Schedulers;
import java.util.Optional;
@Component
public class ApplicationMessageBusImpl implements ApplicationMessageBus {
private final ReplayProcessor<Message> messages = ReplayProcessor.create(1);
private final FluxSink<Message> messagesSink = messages.sink();
@Override
public <M extends Message> void publishMessage(M message) {
Optional.ofNullable(message).ifPresent(messagesSink::next);
}
@Override
public Flux<Message> messagesStream() {
return messages.publishOn(Schedulers.parallel());
}
}