BAEL-5924 Java 8 Stream with Batch Processing Support (#13203)
This commit is contained in:
@@ -0,0 +1,47 @@
|
||||
package com.baeldung.streams.processing;
|
||||
|
||||
import static java.util.Spliterator.ORDERED;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Spliterators;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
public class CustomBatchIterator<T> implements Iterator<List<T>> {
|
||||
private final int batchSize;
|
||||
private List<T> currentBatch;
|
||||
private final Iterator<T> iterator;
|
||||
|
||||
public CustomBatchIterator(Iterator<T> sourceIterator, int batchSize) {
|
||||
this.batchSize = batchSize;
|
||||
this.iterator = sourceIterator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<T> next() {
|
||||
return currentBatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
prepareNextBatch();
|
||||
return currentBatch != null && !currentBatch.isEmpty();
|
||||
}
|
||||
|
||||
public static <T> Stream<List<T>> batchStreamOf(Stream<T> stream, int batchSize) {
|
||||
return stream(new CustomBatchIterator<>(stream.iterator(), batchSize));
|
||||
}
|
||||
|
||||
private static <T> Stream<T> stream(Iterator<T> iterator) {
|
||||
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, ORDERED), false);
|
||||
}
|
||||
|
||||
private void prepareNextBatch() {
|
||||
currentBatch = new ArrayList<>(batchSize);
|
||||
while (iterator.hasNext() && currentBatch.size() < batchSize) {
|
||||
currentBatch.add(iterator.next());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,141 @@
|
||||
package com.baeldung.streams.processing;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.collections4.ListUtils;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import com.google.common.collect.Iterators;
|
||||
|
||||
import cyclops.data.LazySeq;
|
||||
import cyclops.reactive.ReactiveSeq;
|
||||
import io.reactivex.rxjava3.core.Observable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class StreamProcessingUnitTest {
|
||||
public final int BATCH_SIZE = 10;
|
||||
|
||||
private final List<Integer> firstBatch = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
|
||||
private final List<Integer> secondBatch = List.of(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);
|
||||
private final List<Integer> thirdBatch = List.of(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
|
||||
private final List<Integer> fourthBatch = List.of(30, 31, 32, 33);
|
||||
|
||||
public Stream<Integer> data;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
data = IntStream.range(0, 34)
|
||||
.boxed();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAStreamOfData_whenIsProcessingInBatchUsingSpliterator_thenFourBatchesAreObtained() {
|
||||
Collection<List<Integer>> result = new ArrayList<>();
|
||||
CustomBatchIterator.batchStreamOf(data, BATCH_SIZE)
|
||||
.forEach(result::add);
|
||||
assertTrue(result.contains(firstBatch));
|
||||
assertTrue(result.contains(secondBatch));
|
||||
assertTrue(result.contains(thirdBatch));
|
||||
assertTrue(result.contains(fourthBatch));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAStreamOfData_whenIsProcessingInBatchUsingCollectionAPI_thenFourBatchesAreObtained() {
|
||||
Collection<List<Integer>> result = data.collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
|
||||
.values();
|
||||
assertTrue(result.contains(firstBatch));
|
||||
assertTrue(result.contains(secondBatch));
|
||||
assertTrue(result.contains(thirdBatch));
|
||||
assertTrue(result.contains(fourthBatch));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAStreamOfData_whenIsProcessingInBatchParallelUsingCollectionAPI_thenFourBatchesAreObtained() {
|
||||
Collection<List<Integer>> result = data.parallel()
|
||||
.collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
|
||||
.values();
|
||||
assertTrue(result.contains(firstBatch));
|
||||
assertTrue(result.contains(secondBatch));
|
||||
assertTrue(result.contains(thirdBatch));
|
||||
assertTrue(result.contains(fourthBatch));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAStreamOfData_whenIsProcessingInBatchUsingRxJavaV3_thenFourBatchesAreObtained() {
|
||||
// RxJava v3
|
||||
Collection<List<Integer>> result = new ArrayList<>();
|
||||
Observable.fromStream(data)
|
||||
.buffer(BATCH_SIZE)
|
||||
.subscribe(result::add);
|
||||
assertTrue(result.contains(firstBatch));
|
||||
assertTrue(result.contains(secondBatch));
|
||||
assertTrue(result.contains(thirdBatch));
|
||||
assertTrue(result.contains(fourthBatch));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAStreamOfData_whenIsProcessingInBatchUsingReactor_thenFourBatchesAreObtained() {
|
||||
Collection<List<Integer>> result = new ArrayList<>();
|
||||
Flux.fromStream(data)
|
||||
.buffer(BATCH_SIZE)
|
||||
.subscribe(result::add);
|
||||
assertTrue(result.contains(firstBatch));
|
||||
assertTrue(result.contains(secondBatch));
|
||||
assertTrue(result.contains(thirdBatch));
|
||||
assertTrue(result.contains(fourthBatch));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAStreamOfData_whenIsProcessingInBatchUsingApacheCommon_thenFourBatchesAreObtained() {
|
||||
Collection<List<Integer>> result = new ArrayList<>(ListUtils.partition(data.collect(Collectors.toList()), BATCH_SIZE));
|
||||
assertTrue(result.contains(firstBatch));
|
||||
assertTrue(result.contains(secondBatch));
|
||||
assertTrue(result.contains(thirdBatch));
|
||||
assertTrue(result.contains(fourthBatch));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAStreamOfData_whenIsProcessingInBatchUsingGuava_thenFourBatchesAreObtained() {
|
||||
Collection<List<Integer>> result = new ArrayList<>();
|
||||
Iterators.partition(data.iterator(), BATCH_SIZE)
|
||||
.forEachRemaining(result::add);
|
||||
assertTrue(result.contains(firstBatch));
|
||||
assertTrue(result.contains(secondBatch));
|
||||
assertTrue(result.contains(thirdBatch));
|
||||
assertTrue(result.contains(fourthBatch));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclops_thenFourBatchesAreObtained() {
|
||||
Collection<List<Integer>> result = new ArrayList<>();
|
||||
ReactiveSeq.fromStream(data)
|
||||
.grouped(BATCH_SIZE)
|
||||
.toList()
|
||||
.forEach(value -> result.add(value.collect(Collectors.toList())));
|
||||
assertTrue(result.contains(firstBatch));
|
||||
assertTrue(result.contains(secondBatch));
|
||||
assertTrue(result.contains(thirdBatch));
|
||||
assertTrue(result.contains(fourthBatch));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclopsLazy_thenFourBatchesAreObtained() {
|
||||
Collection<List<Integer>> result = new ArrayList<>();
|
||||
LazySeq.fromStream(data)
|
||||
.grouped(BATCH_SIZE)
|
||||
.toList()
|
||||
.forEach(value -> result.add(value.collect(Collectors.toList())));
|
||||
assertTrue(result.contains(firstBatch));
|
||||
assertTrue(result.contains(secondBatch));
|
||||
assertTrue(result.contains(thirdBatch));
|
||||
assertTrue(result.contains(fourthBatch));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package com.baeldung.streams.processing.vavr;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import com.baeldung.streams.processing.StreamProcessingUnitTest;
|
||||
|
||||
import io.vavr.collection.List;
|
||||
import io.vavr.collection.Stream;
|
||||
|
||||
public class StreamProcessingWithVavrUnitTest extends StreamProcessingUnitTest {
|
||||
|
||||
private final List<Integer> firstBatch = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
|
||||
private final List<Integer> secondBatch = List.of(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);
|
||||
private final List<Integer> thirdBatch = List.of(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
|
||||
private final List<Integer> fourthBatch = List.of(30, 31, 32, 33);
|
||||
|
||||
@Test
|
||||
public void givenAStreamOfData_whenIsProcessingInBatchUsingVavr_thenFourBatchesAreObtained() {
|
||||
List<List<Integer>> result = Stream.ofAll(data)
|
||||
.toList()
|
||||
.grouped(BATCH_SIZE)
|
||||
.toList();
|
||||
assertTrue(result.contains(firstBatch));
|
||||
assertTrue(result.contains(secondBatch));
|
||||
assertTrue(result.contains(thirdBatch));
|
||||
assertTrue(result.contains(fourthBatch));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user