diff --git a/netflix-modules/mantis/pom.xml b/netflix-modules/mantis/pom.xml
index 48151c142f..5d9611ccdf 100644
--- a/netflix-modules/mantis/pom.xml
+++ b/netflix-modules/mantis/pom.xml
@@ -52,6 +52,20 @@
1.18.12
+
+ org.springframework
+ spring-webflux
+ 5.0.9.RELEASE
+ test
+
+
+
+ io.projectreactor.netty
+ reactor-netty
+ 0.9.12.RELEASE
+ test
+
+
diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java
index 229d11d39d..7fc514deef 100644
--- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java
+++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogAggregationJob.java
@@ -9,10 +9,17 @@ import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJob;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata;
+import io.mantisrx.runtime.sink.Sink;
import io.mantisrx.runtime.sink.Sinks;
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+@NoArgsConstructor
+@AllArgsConstructor
public class LogAggregationJob extends MantisJobProvider {
+ private Sink sink = Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString));
+
@Override
public Job getJobInstance() {
@@ -21,7 +28,7 @@ public class LogAggregationJob extends MantisJobProvider {
.stage(new TransformLogStage(), TransformLogStage.stageConfig())
.stage(new GroupLogStage(), GroupLogStage.config())
.stage(new CountLogStage(), CountLogStage.config())
- .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString)))
+ .sink(sink)
.metadata(new Metadata.Builder().build())
.create();
diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java
index 492f30c43a..34ccf8355a 100644
--- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java
+++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/job/LogCollectingJob.java
@@ -9,18 +9,23 @@ import io.mantisrx.runtime.MantisJob;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.ScalarToScalar;
-import lombok.extern.slf4j.Slf4j;
+import io.mantisrx.runtime.sink.Sink;
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
-@Slf4j
+@NoArgsConstructor
+@AllArgsConstructor
public class LogCollectingJob extends MantisJobProvider {
+ private Sink sink = new LogSink();
+
@Override
public Job getJobInstance() {
return MantisJob
.source(new RandomLogSource())
.stage(new TransformLogStage(), new ScalarToScalar.Config<>())
- .sink(new LogSink())
+ .sink(sink)
.metadata(new Metadata.Builder().build())
.create();
diff --git a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java
index 0eeb7ea086..e0e3c4f9fa 100644
--- a/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java
+++ b/netflix-modules/mantis/src/main/java/com/baeldung/netflix/mantis/model/LogAggregate.java
@@ -5,15 +5,17 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.runtime.codec.JsonType;
import lombok.AllArgsConstructor;
import lombok.Getter;
+import lombok.NoArgsConstructor;
@Getter
+@NoArgsConstructor
@AllArgsConstructor
public class LogAggregate implements JsonType {
private static final ObjectMapper mapper = new ObjectMapper();
- private final Integer count;
- private final String level;
+ private Integer count;
+ private String level;
public String toJsonString() {
try {
diff --git a/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java
new file mode 100644
index 0000000000..b9b16e2146
--- /dev/null
+++ b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogAggregationJobIntegrationTest.java
@@ -0,0 +1,56 @@
+package com.baeldung.netflix.mantis.job;
+
+import com.baeldung.netflix.mantis.model.LogAggregate;
+import io.mantisrx.runtime.PortRequest;
+import io.mantisrx.runtime.sink.Sinks;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Arrays.asList;
+import static java.util.Optional.of;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class LogAggregationJobIntegrationTest extends MantisJobTestBase {
+
+ private final static int PORT = 7382;
+ private final static String SINK_URL = "http://localhost:" + PORT;
+
+ @BeforeAll
+ static void beforeAll() {
+ start(new LogAggregationJob((context, portRequest, logAggregateObservable) -> {
+ logAggregateObservable.subscribe();
+ Sinks.sse(LogAggregate::toJsonString).call(context, new PortRequest(PORT), logAggregateObservable);
+ }));
+ }
+
+ @Override
+ public String getSinkUrl() {
+ return SINK_URL;
+ }
+
+ @Override
+ public Class getEventType() {
+ return LogAggregate.class;
+ }
+
+ @Test
+ void whenReadingFromSink_thenShouldRetrieveCorrectNumberOfLogAggregates() {
+ assertEquals(of(5L), sinkStream.take(5).count().blockOptional());
+ }
+
+ @Test
+ void whenReadingFromSink_thenShouldRetrieveLogAggregate() {
+ assertNotNull(sinkStream.take(1).blockFirst());
+ }
+
+ @Test
+ void whenReadingFromSink_thenShouldRetrieveValidLogAggregate() {
+ LogAggregate logAggregate = sinkStream.take(1).blockFirst();
+
+ assertTrue(asList("ERROR", "WARN", "INFO").contains(logAggregate.getLevel()));
+ assertTrue(logAggregate.getCount() > 0);
+ }
+
+}
\ No newline at end of file
diff --git a/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java
new file mode 100644
index 0000000000..87e0c194b5
--- /dev/null
+++ b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/LogCollectingJobIntegrationTest.java
@@ -0,0 +1,73 @@
+package com.baeldung.netflix.mantis.job;
+
+import com.baeldung.netflix.mantis.model.LogEvent;
+import com.baeldung.netflix.mantis.sink.LogSink;
+import io.mantisrx.runtime.Context;
+import io.mantisrx.runtime.PortRequest;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import rx.Observable;
+
+import static java.util.Arrays.asList;
+import static java.util.Optional.of;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class LogCollectingJobIntegrationTest extends MantisJobTestBase {
+
+ private final static int PORT = 7381;
+ private final static String SINK_URL = "http://localhost:" + PORT;
+
+ @BeforeAll
+ static void beforeAll() {
+
+ start(new LogCollectingJob(new LogSink() {
+
+ @Override
+ public void call(Context context, PortRequest portRequest, Observable observable) {
+ super.call(context, new PortRequest(PORT), observable);
+ }
+
+ }));
+
+ }
+
+ @Override
+ public String getSinkUrl() {
+ return SINK_URL;
+ }
+
+ @Override
+ public Class getEventType() {
+ return LogEvent.class;
+ }
+
+ @Test
+ void whenReadingFromSink_thenShouldRetrieveCorrectNumberOfLogEvents() {
+ assertEquals(of(5L), sinkStream.take(5).count().blockOptional());
+ }
+
+ @Test
+ void whenReadingFromSink_thenShouldRetrieveLogEvent() {
+ assertNotNull(sinkStream.take(1).blockFirst());
+ }
+
+ @Test
+ void whenReadingFromSink_thenShouldRetrieveValidLogEvent() {
+ LogEvent logEvent = sinkStream.take(1).blockFirst();
+
+ assertTrue(asList("ERROR", "WARN", "INFO").contains(logEvent.getLevel()));
+ assertTrue(asList("login attempt", "user created").contains(logEvent.getMessage()));
+ }
+
+ @Test
+ void whenReadingFromSink_thenShouldRetrieveFilteredLogEvents() {
+ getSinkStream(SINK_URL + "?filter=login")
+ .take(7)
+ .toStream().forEach(
+ logEvent -> assertEquals("login attempt", logEvent.getMessage())
+ );
+ }
+
+}
\ No newline at end of file
diff --git a/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java
new file mode 100644
index 0000000000..89425299a4
--- /dev/null
+++ b/netflix-modules/mantis/src/test/java/com/baeldung/netflix/mantis/job/MantisJobTestBase.java
@@ -0,0 +1,49 @@
+package com.baeldung.netflix.mantis.job;
+
+import io.mantisrx.runtime.Job;
+import io.mantisrx.runtime.MantisJobProvider;
+import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+
+public abstract class MantisJobTestBase {
+
+ private static Job jobInstance;
+ Flux sinkStream;
+
+ public abstract String getSinkUrl();
+ public abstract Class getEventType();
+
+ @BeforeEach
+ void setUp() {
+ sinkStream = getSinkStream(getSinkUrl());
+ }
+
+ @AfterAll
+ static void afterAll() {
+ stopJob();
+ }
+
+ protected Flux getSinkStream(String sinkUrl) {
+ return WebClient.builder().build().get()
+ .uri(sinkUrl)
+ .retrieve()
+ .bodyToFlux(getEventType())
+ .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(2000)));
+ }
+
+ static void start(MantisJobProvider job) {
+ jobInstance = job.getJobInstance();
+ new Thread(() -> LocalJobExecutorNetworked.execute(jobInstance)).start();
+ }
+
+ static void stopJob() {
+ jobInstance.getLifecycle().shutdown();
+ }
+
+}