diff --git a/reactor-core/src/main/java/com/baeldung/reactor/ItemProducerCreate.java b/reactor-core/src/main/java/com/baeldung/reactor/ItemProducerCreate.java new file mode 100644 index 0000000000..6078f8ef0b --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/ItemProducerCreate.java @@ -0,0 +1,44 @@ +package com.baeldung.reactor; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; + +public class ItemProducerCreate { + + Logger logger = LoggerFactory.getLogger(NetworTrafficProducerPush.class); + + Consumer> listener; + + public void create() { + Flux articlesFlux = Flux.create((sink) -> { + ItemProducerCreate.this.listener = (items) -> { + items.stream() + .forEach(article -> sink.next(article)); + }; + }); + articlesFlux.subscribe(ItemProducerCreate.this.logger::info); + } + + public static void main(String[] args) { + ItemProducerCreate producer = new ItemProducerCreate(); + producer.create(); + + new Thread(new Runnable() { + + @Override + public void run() { + List items = new ArrayList<>(); + items.add("Item 1"); + items.add("Item 2"); + items.add("Item 3"); + producer.listener.accept(items); + } + }).start(); + } +} diff --git a/reactor-core/src/main/java/com/baeldung/reactor/NetworTrafficProducerPush.java b/reactor-core/src/main/java/com/baeldung/reactor/NetworTrafficProducerPush.java new file mode 100644 index 0000000000..807ceae84d --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/NetworTrafficProducerPush.java @@ -0,0 +1,34 @@ +package com.baeldung.reactor; + +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink.OverflowStrategy; + +public class NetworTrafficProducerPush { + + Logger logger = LoggerFactory.getLogger(NetworTrafficProducerPush.class); + + Consumer listener; + + public void subscribe(Consumer consumer) { + Flux flux = Flux.push(sink -> { + NetworTrafficProducerPush.this.listener = (t) -> sink.next(t); + }, OverflowStrategy.DROP); + flux.subscribe(consumer); + } + + public void onPacket(String packet) { + listener.accept(packet); + } + + public static void main(String[] args) { + NetworTrafficProducerPush trafficProducer = new NetworTrafficProducerPush(); + trafficProducer.subscribe(trafficProducer.logger::info); + trafficProducer.onPacket("Packet[A18]"); + } + +} diff --git a/reactor-core/src/main/java/com/baeldung/reactor/ProgramaticSequences.java b/reactor-core/src/main/java/com/baeldung/reactor/ProgramaticSequences.java new file mode 100644 index 0000000000..b52def377d --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/ProgramaticSequences.java @@ -0,0 +1,58 @@ +package com.baeldung.reactor; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; + +public class ProgramaticSequences { + + Logger logger = LoggerFactory.getLogger(ProgramaticSequences.class); + + public void statefullImutableGenerate() { + Flux flux = Flux.generate(() -> 1, (state, sink) -> { + sink.next("2 + " + state + " = " + 2 + state); + if (state == 101) + sink.complete(); + return state + 1; + }); + + flux.subscribe(logger::info); + } + + public void statefullMutableGenerate() { + Flux flux = Flux.generate(AtomicInteger::new, (state, sink) -> { + int i = state.getAndIncrement(); + sink.next("2 + " + state + " = " + 2 + state); + if (i == 101) + sink.complete(); + return state; + }); + + flux.subscribe(logger::info); + } + + public void handle() { + Flux elephants = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .handle((i, sink) -> { + String animal = "Elephant nr " + i; + if (i % 2 == 0) { + sink.next(animal); + } + }); + + elephants.subscribe(logger::info); + } + + public static void main(String[] args) { + ProgramaticSequences ps = new ProgramaticSequences(); + + ps.statefullImutableGenerate(); + ps.statefullMutableGenerate(); + ps.handle(); + + } + +} diff --git a/reactor-core/src/main/java/com/baeldung/reactor/StatelessGenerate.java b/reactor-core/src/main/java/com/baeldung/reactor/StatelessGenerate.java new file mode 100644 index 0000000000..c82f8e160b --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/StatelessGenerate.java @@ -0,0 +1,24 @@ +package com.baeldung.reactor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; + +public class StatelessGenerate { + + Logger logger = LoggerFactory.getLogger(StatelessGenerate.class); + + public void statelessGenerate() { + Flux flux = Flux.generate((sink) -> { + sink.next("hallo"); + }); + flux.subscribe(logger::info); + } + + public static void main(String[] args) { + StatelessGenerate ps = new StatelessGenerate(); + ps.statelessGenerate(); + } + +}