DATAMONGO-2393 - Support configurable chunk size.

We now allow consuming GridFS files using a configurable chunk size. The default chunk size is now 256kb.

Original Pull Request: #799
This commit is contained in:
Mark Paluch
2019-10-21 09:10:09 +02:00
committed by Christoph Strobl
parent ec3ccc004e
commit f2134fb2f8
6 changed files with 88 additions and 12 deletions

View File

@@ -42,12 +42,14 @@ class BinaryStreamAdapters {
*
* @param inputStream must not be {@literal null}.
* @param dataBufferFactory must not be {@literal null}.
* @param bufferSize read {@code n} bytes per iteration.
* @return {@link Flux} emitting {@link DataBuffer}s.
* @see DataBufferFactory#allocateBuffer()
*/
static Flux<DataBuffer> toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
static Flux<DataBuffer> toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory,
int bufferSize) {
return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) //
return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory, bufferSize) //
.filter(it -> {
if (it.readableByteCount() == 0) {

View File

@@ -20,6 +20,7 @@ import reactor.core.publisher.Flux;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.IntFunction;
import org.reactivestreams.Publisher;
@@ -41,20 +42,19 @@ public class ReactiveGridFsResource extends AbstractResource {
private final @Nullable GridFSFile file;
private final String filename;
private final Flux<DataBuffer> content;
private final IntFunction<Flux<DataBuffer>> contentFunction;
/**
* Creates a new, absent {@link ReactiveGridFsResource}.
*
* @param filename filename of the absent resource.
* @param content
* @since 2.1
*/
private ReactiveGridFsResource(String filename, Publisher<DataBuffer> content) {
this.file = null;
this.filename = filename;
this.content = Flux.from(content);
this.contentFunction = any -> Flux.from(content);
}
/**
@@ -64,10 +64,21 @@ public class ReactiveGridFsResource extends AbstractResource {
* @param content
*/
public ReactiveGridFsResource(GridFSFile file, Publisher<DataBuffer> content) {
this(file, (IntFunction<Flux<DataBuffer>>) any -> Flux.from(content));
}
/**
* Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}.
*
* @param file must not be {@literal null}.
* @param contentFunction
* @since 2.2.1
*/
ReactiveGridFsResource(GridFSFile file, IntFunction<Flux<DataBuffer>> contentFunction) {
this.file = file;
this.filename = file.getFilename();
this.content = Flux.from(content);
this.contentFunction = contentFunction;
}
/**
@@ -165,16 +176,28 @@ public class ReactiveGridFsResource extends AbstractResource {
}
/**
* Retrieve the download stream.
* Retrieve the download stream using the default chunk size of 256kb.
*
* @return
*/
public Flux<DataBuffer> getDownloadStream() {
return getDownloadStream(256 * 1024); // 256kb buffers
}
/**
* Retrieve the download stream.
*
* @param chunkSize chunk size in bytes to use.
* @return
* @since 2.2.1
*/
public Flux<DataBuffer> getDownloadStream(int chunkSize) {
if (!exists()) {
return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription())));
}
return this.content;
return contentFunction.apply(chunkSize);
}
private void verifyExists() throws FileNotFoundException {

View File

@@ -223,9 +223,11 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
return Mono.fromSupplier(() -> {
GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId());
return new ReactiveGridFsResource(file, chunkSize -> {
return new ReactiveGridFsResource(file, BinaryStreamAdapters.toPublisher(stream, dataBufferFactory));
GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId());
return BinaryStreamAdapters.toPublisher(stream, dataBufferFactory, chunkSize);
});
});
}

View File

@@ -25,6 +25,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.junit.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
@@ -49,7 +50,7 @@ public class BinaryStreamAdaptersUnitTests {
byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
Flux<DataBuffer> dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory());
Flux<DataBuffer> dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory(), 256);
DataBufferUtils.join(dataBuffers) //
.as(StepVerifier::create) //

View File

@@ -44,7 +44,7 @@ public class DataBufferPublisherAdapterUnitTests {
when(asyncInput.read(any())).thenReturn(Mono.just(1), Mono.error(new IllegalStateException()));
when(asyncInput.close()).thenReturn(Mono.empty());
Flux<DataBuffer> binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory);
Flux<DataBuffer> binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory, 256);
StepVerifier.create(binaryStream, 0) //
.thenRequest(1) //

View File

@@ -24,6 +24,7 @@ import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.bson.BsonObjectId;
@@ -40,7 +41,9 @@ import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
@@ -48,6 +51,7 @@ import org.springframework.util.StreamUtils;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSInputFile;
import com.mongodb.internal.HexUtils;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper;
@@ -66,6 +70,8 @@ public class ReactiveGridFsTemplateTests {
@Autowired ReactiveGridFsOperations operations;
@Autowired SimpleMongoDbFactory mongoClient;
@Autowired ReactiveMongoDatabaseFactory dbFactory;
@Autowired MongoConverter mongoConverter;
@Before
public void setUp() {
@@ -92,6 +98,48 @@ public class ReactiveGridFsTemplateTests {
.verifyComplete();
}
@Test // DATAMONGO-1855
public void storesAndLoadsLargeFileCorrectly() {
ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000 * 1); // 1 mb
int i = 0;
while (buffer.remaining() != 0) {
byte b = (byte) (i++ % 16);
String string = HexUtils.toHex(new byte[] { b });
buffer.put(string.getBytes());
}
buffer.flip();
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
ObjectId reference = operations.store(Flux.just(factory.wrap(buffer)), "large.txt").block();
buffer.clear();
// default chunk size
operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource)
.flatMapMany(ReactiveGridFsResource::getDownloadStream) //
.transform(DataBufferUtils::join) //
.as(StepVerifier::create) //
.consumeNextWith(dataBuffer -> {
assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining());
assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer);
}).verifyComplete();
// small chunk size
operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource)
.flatMapMany(reactiveGridFsResource -> reactiveGridFsResource.getDownloadStream(256)) //
.transform(DataBufferUtils::join) //
.as(StepVerifier::create) //
.consumeNextWith(dataBuffer -> {
assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining());
assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer);
}).verifyComplete();
}
@Test // DATAMONGO-2392
public void storesAndFindsByUUID() throws IOException {