diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java index 0bacda78a..0bb0b1fdd 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java @@ -42,12 +42,14 @@ class BinaryStreamAdapters { * * @param inputStream must not be {@literal null}. * @param dataBufferFactory must not be {@literal null}. + * @param bufferSize read {@code n} bytes per iteration. * @return {@link Flux} emitting {@link DataBuffer}s. * @see DataBufferFactory#allocateBuffer() */ - static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, + int bufferSize) { - return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) // + return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory, bufferSize) // .filter(it -> { if (it.readableByteCount() == 0) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java index 4946d6d0d..d67aaa3f1 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java @@ -20,6 +20,7 @@ import reactor.core.publisher.Flux; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.util.function.IntFunction; import org.reactivestreams.Publisher; @@ -41,20 +42,19 @@ public class ReactiveGridFsResource extends AbstractResource { private final @Nullable GridFSFile file; private final String filename; - private final Flux content; + private final IntFunction> contentFunction; /** * Creates a new, absent {@link ReactiveGridFsResource}. * * @param filename filename of the absent resource. * @param content - * @since 2.1 */ private ReactiveGridFsResource(String filename, Publisher content) { this.file = null; this.filename = filename; - this.content = Flux.from(content); + this.contentFunction = any -> Flux.from(content); } /** @@ -64,10 +64,21 @@ public class ReactiveGridFsResource extends AbstractResource { * @param content */ public ReactiveGridFsResource(GridFSFile file, Publisher content) { + this(file, (IntFunction>) any -> Flux.from(content)); + } + + /** + * Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}. + * + * @param file must not be {@literal null}. + * @param contentFunction + * @since 2.2.1 + */ + ReactiveGridFsResource(GridFSFile file, IntFunction> contentFunction) { this.file = file; this.filename = file.getFilename(); - this.content = Flux.from(content); + this.contentFunction = contentFunction; } /** @@ -165,16 +176,28 @@ public class ReactiveGridFsResource extends AbstractResource { } /** - * Retrieve the download stream. + * Retrieve the download stream using the default chunk size of 256kb. * * @return */ public Flux getDownloadStream() { + return getDownloadStream(256 * 1024); // 256kb buffers + } + + /** + * Retrieve the download stream. + * + * @param chunkSize chunk size in bytes to use. + * @return + * @since 2.2.1 + */ + public Flux getDownloadStream(int chunkSize) { if (!exists()) { return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription()))); } - return this.content; + + return contentFunction.apply(chunkSize); } private void verifyExists() throws FileNotFoundException { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java index fcdf77df9..cec693763 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java @@ -223,9 +223,11 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R return Mono.fromSupplier(() -> { - GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId()); + return new ReactiveGridFsResource(file, chunkSize -> { - return new ReactiveGridFsResource(file, BinaryStreamAdapters.toPublisher(stream, dataBufferFactory)); + GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId()); + return BinaryStreamAdapters.toPublisher(stream, dataBufferFactory, chunkSize); + }); }); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java index 9c17d778f..7d46d4c46 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.junit.Test; + import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; @@ -49,7 +50,7 @@ public class BinaryStreamAdaptersUnitTests { byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); - Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory()); + Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory(), 256); DataBufferUtils.join(dataBuffers) // .as(StepVerifier::create) // diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java index e89d96676..26eaebc6c 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java @@ -44,7 +44,7 @@ public class DataBufferPublisherAdapterUnitTests { when(asyncInput.read(any())).thenReturn(Mono.just(1), Mono.error(new IllegalStateException())); when(asyncInput.close()).thenReturn(Mono.empty()); - Flux binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory); + Flux binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory, 256); StepVerifier.create(binaryStream, 0) // .thenRequest(1) // diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java index cc808339f..a95976b7d 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java @@ -24,6 +24,7 @@ import reactor.core.publisher.Flux; import reactor.test.StepVerifier; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.UUID; import org.bson.BsonObjectId; @@ -40,7 +41,9 @@ import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DefaultDataBuffer; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.dao.IncorrectResultSizeDataAccessException; +import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; +import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.query.Query; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; @@ -48,6 +51,7 @@ import org.springframework.util.StreamUtils; import com.mongodb.gridfs.GridFS; import com.mongodb.gridfs.GridFSInputFile; +import com.mongodb.internal.HexUtils; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper; @@ -66,6 +70,8 @@ public class ReactiveGridFsTemplateTests { @Autowired ReactiveGridFsOperations operations; @Autowired SimpleMongoDbFactory mongoClient; + @Autowired ReactiveMongoDatabaseFactory dbFactory; + @Autowired MongoConverter mongoConverter; @Before public void setUp() { @@ -92,6 +98,48 @@ public class ReactiveGridFsTemplateTests { .verifyComplete(); } + @Test // DATAMONGO-1855 + public void storesAndLoadsLargeFileCorrectly() { + + ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000 * 1); // 1 mb + + int i = 0; + while (buffer.remaining() != 0) { + byte b = (byte) (i++ % 16); + String string = HexUtils.toHex(new byte[] { b }); + buffer.put(string.getBytes()); + } + buffer.flip(); + + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + + ObjectId reference = operations.store(Flux.just(factory.wrap(buffer)), "large.txt").block(); + + buffer.clear(); + + // default chunk size + operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource) + .flatMapMany(ReactiveGridFsResource::getDownloadStream) // + .transform(DataBufferUtils::join) // + .as(StepVerifier::create) // + .consumeNextWith(dataBuffer -> { + + assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining()); + assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer); + }).verifyComplete(); + + // small chunk size + operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource) + .flatMapMany(reactiveGridFsResource -> reactiveGridFsResource.getDownloadStream(256)) // + .transform(DataBufferUtils::join) // + .as(StepVerifier::create) // + .consumeNextWith(dataBuffer -> { + + assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining()); + assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer); + }).verifyComplete(); + } + @Test // DATAMONGO-2392 public void storesAndFindsByUUID() throws IOException {