diff --git a/core-java-modules/core-java-streams-4/pom.xml b/core-java-modules/core-java-streams-4/pom.xml
index ed4603796d..46c0f3f7e1 100644
--- a/core-java-modules/core-java-streams-4/pom.xml
+++ b/core-java-modules/core-java-streams-4/pom.xml
@@ -59,6 +59,36 @@
3.12.0
test
+
+ io.reactivex.rxjava3
+ rxjava
+ ${rx.java3.version}
+
+
+ io.vavr
+ vavr
+ ${io.varv.version}
+
+
+ io.projectreactor
+ reactor-core
+ ${io.reactor3.version}
+
+
+ org.apache.commons
+ commons-collections4
+ ${apache.commons.collection4.version}
+
+
+ com.google.guava
+ guava
+ ${google.guava.version}
+
+
+ com.oath.cyclops
+ cyclops
+ ${cyclops.version}
+
@@ -90,6 +120,12 @@
12
1.2.5
2.2.2
+ 3.1.5
+ 1.0.0-alpha-4
+ 3.5.1
+ 4.4
+ 31.1-jre
+ 10.4.1
\ No newline at end of file
diff --git a/core-java-modules/core-java-streams-4/src/main/java/com/baeldung/streams/processing/CustomBatchIterator.java b/core-java-modules/core-java-streams-4/src/main/java/com/baeldung/streams/processing/CustomBatchIterator.java
new file mode 100644
index 0000000000..b5407b7283
--- /dev/null
+++ b/core-java-modules/core-java-streams-4/src/main/java/com/baeldung/streams/processing/CustomBatchIterator.java
@@ -0,0 +1,47 @@
+package com.baeldung.streams.processing;
+
+import static java.util.Spliterator.ORDERED;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterators;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class CustomBatchIterator implements Iterator> {
+ private final int batchSize;
+ private List currentBatch;
+ private final Iterator iterator;
+
+ public CustomBatchIterator(Iterator sourceIterator, int batchSize) {
+ this.batchSize = batchSize;
+ this.iterator = sourceIterator;
+ }
+
+ @Override
+ public List next() {
+ return currentBatch;
+ }
+
+ @Override
+ public boolean hasNext() {
+ prepareNextBatch();
+ return currentBatch != null && !currentBatch.isEmpty();
+ }
+
+ public static Stream> batchStreamOf(Stream stream, int batchSize) {
+ return stream(new CustomBatchIterator<>(stream.iterator(), batchSize));
+ }
+
+ private static Stream stream(Iterator iterator) {
+ return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, ORDERED), false);
+ }
+
+ private void prepareNextBatch() {
+ currentBatch = new ArrayList<>(batchSize);
+ while (iterator.hasNext() && currentBatch.size() < batchSize) {
+ currentBatch.add(iterator.next());
+ }
+ }
+}
\ No newline at end of file
diff --git a/core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/StreamProcessingUnitTest.java b/core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/StreamProcessingUnitTest.java
new file mode 100644
index 0000000000..f8f88387d5
--- /dev/null
+++ b/core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/StreamProcessingUnitTest.java
@@ -0,0 +1,141 @@
+package com.baeldung.streams.processing;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.apache.commons.collections4.ListUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.Iterators;
+
+import cyclops.data.LazySeq;
+import cyclops.reactive.ReactiveSeq;
+import io.reactivex.rxjava3.core.Observable;
+import reactor.core.publisher.Flux;
+
+public class StreamProcessingUnitTest {
+ public final int BATCH_SIZE = 10;
+
+ private final List firstBatch = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ private final List secondBatch = List.of(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);
+ private final List thirdBatch = List.of(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
+ private final List fourthBatch = List.of(30, 31, 32, 33);
+
+ public Stream data;
+
+ @BeforeEach
+ public void setUp() {
+ data = IntStream.range(0, 34)
+ .boxed();
+ }
+
+ @Test
+ public void givenAStreamOfData_whenIsProcessingInBatchUsingSpliterator_thenFourBatchesAreObtained() {
+ Collection> result = new ArrayList<>();
+ CustomBatchIterator.batchStreamOf(data, BATCH_SIZE)
+ .forEach(result::add);
+ assertTrue(result.contains(firstBatch));
+ assertTrue(result.contains(secondBatch));
+ assertTrue(result.contains(thirdBatch));
+ assertTrue(result.contains(fourthBatch));
+ }
+
+ @Test
+ public void givenAStreamOfData_whenIsProcessingInBatchUsingCollectionAPI_thenFourBatchesAreObtained() {
+ Collection> result = data.collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
+ .values();
+ assertTrue(result.contains(firstBatch));
+ assertTrue(result.contains(secondBatch));
+ assertTrue(result.contains(thirdBatch));
+ assertTrue(result.contains(fourthBatch));
+ }
+
+ @Test
+ public void givenAStreamOfData_whenIsProcessingInBatchParallelUsingCollectionAPI_thenFourBatchesAreObtained() {
+ Collection> result = data.parallel()
+ .collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
+ .values();
+ assertTrue(result.contains(firstBatch));
+ assertTrue(result.contains(secondBatch));
+ assertTrue(result.contains(thirdBatch));
+ assertTrue(result.contains(fourthBatch));
+ }
+
+ @Test
+ public void givenAStreamOfData_whenIsProcessingInBatchUsingRxJavaV3_thenFourBatchesAreObtained() {
+ // RxJava v3
+ Collection> result = new ArrayList<>();
+ Observable.fromStream(data)
+ .buffer(BATCH_SIZE)
+ .subscribe(result::add);
+ assertTrue(result.contains(firstBatch));
+ assertTrue(result.contains(secondBatch));
+ assertTrue(result.contains(thirdBatch));
+ assertTrue(result.contains(fourthBatch));
+ }
+
+ @Test
+ public void givenAStreamOfData_whenIsProcessingInBatchUsingReactor_thenFourBatchesAreObtained() {
+ Collection> result = new ArrayList<>();
+ Flux.fromStream(data)
+ .buffer(BATCH_SIZE)
+ .subscribe(result::add);
+ assertTrue(result.contains(firstBatch));
+ assertTrue(result.contains(secondBatch));
+ assertTrue(result.contains(thirdBatch));
+ assertTrue(result.contains(fourthBatch));
+ }
+
+ @Test
+ public void givenAStreamOfData_whenIsProcessingInBatchUsingApacheCommon_thenFourBatchesAreObtained() {
+ Collection> result = new ArrayList<>(ListUtils.partition(data.collect(Collectors.toList()), BATCH_SIZE));
+ assertTrue(result.contains(firstBatch));
+ assertTrue(result.contains(secondBatch));
+ assertTrue(result.contains(thirdBatch));
+ assertTrue(result.contains(fourthBatch));
+ }
+
+ @Test
+ public void givenAStreamOfData_whenIsProcessingInBatchUsingGuava_thenFourBatchesAreObtained() {
+ Collection> result = new ArrayList<>();
+ Iterators.partition(data.iterator(), BATCH_SIZE)
+ .forEachRemaining(result::add);
+ assertTrue(result.contains(firstBatch));
+ assertTrue(result.contains(secondBatch));
+ assertTrue(result.contains(thirdBatch));
+ assertTrue(result.contains(fourthBatch));
+ }
+
+ @Test
+ public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclops_thenFourBatchesAreObtained() {
+ Collection> result = new ArrayList<>();
+ ReactiveSeq.fromStream(data)
+ .grouped(BATCH_SIZE)
+ .toList()
+ .forEach(value -> result.add(value.collect(Collectors.toList())));
+ assertTrue(result.contains(firstBatch));
+ assertTrue(result.contains(secondBatch));
+ assertTrue(result.contains(thirdBatch));
+ assertTrue(result.contains(fourthBatch));
+ }
+
+ @Test
+ public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclopsLazy_thenFourBatchesAreObtained() {
+ Collection> result = new ArrayList<>();
+ LazySeq.fromStream(data)
+ .grouped(BATCH_SIZE)
+ .toList()
+ .forEach(value -> result.add(value.collect(Collectors.toList())));
+ assertTrue(result.contains(firstBatch));
+ assertTrue(result.contains(secondBatch));
+ assertTrue(result.contains(thirdBatch));
+ assertTrue(result.contains(fourthBatch));
+ }
+}
\ No newline at end of file
diff --git a/core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/vavr/StreamProcessingWithVavrUnitTest.java b/core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/vavr/StreamProcessingWithVavrUnitTest.java
new file mode 100644
index 0000000000..859b059889
--- /dev/null
+++ b/core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/vavr/StreamProcessingWithVavrUnitTest.java
@@ -0,0 +1,30 @@
+package com.baeldung.streams.processing.vavr;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+import com.baeldung.streams.processing.StreamProcessingUnitTest;
+
+import io.vavr.collection.List;
+import io.vavr.collection.Stream;
+
+public class StreamProcessingWithVavrUnitTest extends StreamProcessingUnitTest {
+
+ private final List firstBatch = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ private final List secondBatch = List.of(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);
+ private final List thirdBatch = List.of(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
+ private final List fourthBatch = List.of(30, 31, 32, 33);
+
+ @Test
+ public void givenAStreamOfData_whenIsProcessingInBatchUsingVavr_thenFourBatchesAreObtained() {
+ List> result = Stream.ofAll(data)
+ .toList()
+ .grouped(BATCH_SIZE)
+ .toList();
+ assertTrue(result.contains(firstBatch));
+ assertTrue(result.contains(secondBatch));
+ assertTrue(result.contains(thirdBatch));
+ assertTrue(result.contains(fourthBatch));
+ }
+}