diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java new file mode 100644 index 000000000..e29f344ce --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java @@ -0,0 +1,248 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import lombok.RequiredArgsConstructor; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; +import reactor.util.concurrent.Queues; +import reactor.util.context.Context; + +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.BiConsumer; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; + +import com.mongodb.reactivestreams.client.Success; +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; + +/** + * Adapter accepting a binary stream {@link Publisher} and emitting its through {@link AsyncInputStream}. + *

+ * This adapter subscribes to the binary {@link Publisher} as soon as the first chunk gets {@link #read(ByteBuffer) + * requested}. Requests are queued and binary chunks are requested from the {@link Publisher}. As soon as the + * {@link Publisher} emits items, chunks are provided to the read request which completes the number-of-written-bytes + * {@link Publisher}. + *

+ * {@link AsyncInputStream} is supposed to work as sequential callback API that is called until reaching EOF. + * {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}. + * + * @author Mark Paluch + * @since 2.2 + */ +@RequiredArgsConstructor +class AsyncInputStreamAdapter implements AsyncInputStream { + + private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater + .newUpdater(AsyncInputStreamAdapter.class, "demand"); + + private static final AtomicIntegerFieldUpdater SUBSCRIBED = AtomicIntegerFieldUpdater + .newUpdater(AsyncInputStreamAdapter.class, "subscribed"); + + private static final int SUBSCRIPTION_NOT_SUBSCRIBED = 0; + private static final int SUBSCRIPTION_SUBSCRIBED = 1; + + private final Publisher buffers; + private final Context subscriberContext; + private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + + private volatile Subscription subscription; + private volatile boolean cancelled; + private volatile boolean complete; + private volatile Throwable error; + private final Queue> readRequests = Queues.> small() + .get(); + + // see DEMAND + private volatile long demand; + + // see SUBSCRIBED + private volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED; + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer) + */ + @Override + public Publisher read(ByteBuffer dst) { + + return Mono.create(sink -> { + + readRequests.offer((db, bytecount) -> { + + try { + + if (error != null) { + sink.error(error); + return; + } + + if (bytecount == -1) { + sink.success(-1); + return; + } + + ByteBuffer byteBuffer = db.asByteBuffer(); + int toWrite = byteBuffer.remaining(); + dst.put(byteBuffer); + + sink.success(toWrite); + } catch (Exception e) { + sink.error(e); + } finally { + DataBufferUtils.release(db); + } + }); + + request(1); + }); + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close() + */ + @Override + public Publisher close() { + + return Mono.create(sink -> { + cancelled = true; + + if (error != null) { + sink.error(error); + return; + } + + sink.success(Success.SUCCESS); + }); + } + + protected void request(int n) { + + if (complete) { + terminatePendingReads(); + return; + } + + Operators.addCap(DEMAND, this, n); + + if (SUBSCRIBED.get(this) == SUBSCRIPTION_NOT_SUBSCRIBED) { + + if (SUBSCRIBED.compareAndSet(this, SUBSCRIPTION_NOT_SUBSCRIBED, SUBSCRIPTION_SUBSCRIBED)) { + + buffers.subscribe(new CoreSubscriber() { + + @Override + public Context currentContext() { + return subscriberContext; + } + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + + Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1); + s.request(1); + } + + @Override + public void onNext(DataBuffer dataBuffer) { + + if (cancelled || complete) { + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, subscriberContext); + return; + } + + BiConsumer poll = readRequests.poll(); + + if (poll == null) { + + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, subscriberContext); + subscription.cancel(); + return; + } + + poll.accept(dataBuffer, dataBuffer.readableByteCount()); + + requestFromSubscription(subscription); + } + + @Override + public void onError(Throwable t) { + + if (cancelled || complete) { + Operators.onErrorDropped(t, subscriberContext); + return; + } + + error = t; + complete = true; + terminatePendingReads(); + } + + @Override + public void onComplete() { + + complete = true; + terminatePendingReads(); + } + }); + } + } else { + + Subscription subscription = this.subscription; + + if (subscription != null) { + requestFromSubscription(subscription); + } + } + } + + void requestFromSubscription(Subscription subscription) { + + long demand = DEMAND.get(AsyncInputStreamAdapter.this); + + if (cancelled) { + subscription.cancel(); + } + + if (demand > 0 && DEMAND.compareAndSet(AsyncInputStreamAdapter.this, demand, demand - 1)) { + subscription.request(1); + } + } + + /** + * Terminates pending reads with empty buffers. + */ + void terminatePendingReads() { + + BiConsumer readers; + + while ((readers = readRequests.poll()) != null) { + readers.accept(factory.wrap(new byte[0]), -1); + } + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java new file mode 100644 index 000000000..ae30151bd --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java @@ -0,0 +1,78 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; + +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; + +/** + * Utility methods to create adapters from between {@link Publisher} of {@link DataBuffer} and {@link AsyncInputStream}. + * + * @author Mark Paluch + * @since 2.2 + */ +class BinaryStreamAdapters { + + /** + * Creates a {@link Flux} emitting {@link DataBuffer} by reading binary chunks from {@link AsyncInputStream}. + * Publisher termination (completion, error, cancellation) closes the {@link AsyncInputStream}. + *

+ * The resulting {@link org.reactivestreams.Publisher} filters empty binary chunks and uses {@link DataBufferFactory} + * settings to determine the chunk size. + * + * @param inputStream must not be {@literal null}. + * @param dataBufferFactory must not be {@literal null}. + * @return {@link Flux} emitting {@link DataBuffer}s. + * @see DataBufferFactory#allocateBuffer() + */ + static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + + return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) // + .filter(it -> { + + if (it.readableByteCount() == 0) { + DataBufferUtils.release(it); + return false; + } + return true; + }); + } + + /** + * Creates a {@link Mono} emitting a {@link AsyncInputStream} to consume a {@link Publisher} emitting + * {@link DataBuffer} and exposing the binary stream through {@link AsyncInputStream}. {@link DataBuffer}s are + * released by the adapter during consumption. + *

+ * This method returns a {@link Mono} to retain the {@link reactor.util.context.Context subscriber context}. + * + * @param dataBuffers must not be {@literal null}. + * @return {@link Mono} emitting {@link AsyncInputStream}. + * @see DataBufferUtils#release(DataBuffer) + */ + static Mono toAsyncInputStream(Publisher dataBuffers) { + + return Mono.create(sink -> { + sink.success(new AsyncInputStreamAdapter(dataBuffers, sink.currentContext())); + }); + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java new file mode 100644 index 000000000..d55489bc7 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -0,0 +1,205 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import lombok.RequiredArgsConstructor; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; +import reactor.util.context.Context; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; + +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; + +/** + * Utility to adapt a {@link AsyncInputStream} to a {@link Publisher} emitting {@link DataBuffer}. + * + * @author Mark Paluch + * @since 2.2 + */ +class DataBufferPublisherAdapter { + + /** + * Creates a {@link Publisher} emitting {@link DataBuffer}s by reading binary chunks from {@link AsyncInputStream}. + * Closes the {@link AsyncInputStream} once the {@link Publisher} terminates. + * + * @param inputStream must not be {@literal null}. + * @param dataBufferFactory must not be {@literal null}. + * @return the resulting {@link Publisher}. + */ + public static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + + State state = new State(inputStream, dataBufferFactory); + + return Flux.usingWhen(Mono.just(inputStream), it -> { + + return Flux. create((sink) -> { + + sink.onDispose(state::close); + sink.onCancel(state::close); + + sink.onRequest(n -> { + state.request(sink, n); + }); + }); + }, AsyncInputStream::close, AsyncInputStream::close, AsyncInputStream::close) // + .concatMap(Flux::just, 1); + } + + @RequiredArgsConstructor + static class State { + + static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, "demand"); + + static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, "state"); + + static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater.newUpdater(State.class, "read"); + + static final int STATE_OPEN = 0; + static final int STATE_CLOSED = 1; + + static final int READ_NONE = 0; + static final int READ_IN_PROGRESS = 1; + + final AsyncInputStream inputStream; + final DataBufferFactory dataBufferFactory; + + // see DEMAND + volatile long demand; + + // see STATE + volatile int state = STATE_OPEN; + + // see READ_IN_PROGRESS + volatile int read = READ_NONE; + + void request(FluxSink sink, long n) { + + Operators.addCap(DEMAND, this, n); + + if (onShouldRead()) { + emitNext(sink); + } + } + + boolean onShouldRead() { + return !isClosed() && getDemand() > 0 && onWantRead(); + } + + boolean onWantRead() { + return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS); + } + + boolean onReadDone() { + return READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE); + } + + long getDemand() { + return DEMAND.get(this); + } + + void close() { + STATE.compareAndSet(this, STATE_OPEN, STATE_CLOSED); + } + + boolean isClosed() { + return STATE.get(this) == STATE_CLOSED; + } + + /** + * Emit the next {@link DataBuffer}. + * + * @param sink + */ + void emitNext(FluxSink sink) { + + DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(); + ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity()); + + Mono.from(inputStream.read(intermediate)).subscribe(new CoreSubscriber() { + + @Override + public Context currentContext() { + return sink.currentContext(); + } + + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(Integer bytes) { + + if (isClosed()) { + + onReadDone(); + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, sink.currentContext()); + return; + } + + intermediate.flip(); + dataBuffer.write(intermediate); + + sink.next(dataBuffer); + + try { + if (bytes == -1) { + sink.complete(); + } + } finally { + onReadDone(); + } + } + + @Override + public void onError(Throwable t) { + + if (isClosed()) { + + Operators.onErrorDropped(t, sink.currentContext()); + return; + } + + onReadDone(); + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, sink.currentContext()); + sink.error(t); + } + + @Override + public void onComplete() { + + if (onShouldRead()) { + emitNext(sink); + } + } + }); + } + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java new file mode 100644 index 000000000..c4fc184e6 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java @@ -0,0 +1,204 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.bson.Document; +import org.bson.types.ObjectId; +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.support.ResourcePatternResolver; +import org.springframework.data.domain.Sort; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.lang.Nullable; + +import com.mongodb.client.gridfs.GridFSFindIterable; +import com.mongodb.client.gridfs.model.GridFSFile; +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; + +/** + * Collection of operations to store and read files from MongoDB GridFS using reactive infrastructure. + * + * @author Mark Paluch + * @since 2.2 + */ +public interface ReactiveGridFsOperations { + + /** + * Stores the given content into a file with the given name. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + default Mono store(Publisher content, String filename) { + return store(content, filename, (Object) null); + } + + /** + * Stores the given content into a file with the given name. + * + * @param content must not be {@literal null}. + * @param metadata can be {@literal null}. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + default Mono store(Publisher content, @Nullable Object metadata) { + return store(content, null, metadata); + } + + /** + * Stores the given content into a file with the given name. + * + * @param content must not be {@literal null}. + * @param metadata can be {@literal null}. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + default Mono store(Publisher content, @Nullable Document metadata) { + return store(content, null, metadata); + } + + /** + * Stores the given content into a file with the given name and content type. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + default Mono store(Publisher content, @Nullable String filename, @Nullable String contentType) { + return store(content, filename, contentType, (Object) null); + } + + /** + * Stores the given content into a file with the given name using the given metadata. The metadata object will be + * marshalled before writing. + * + * @param content must not be {@literal null}. + * @param filename can be {@literal null} or empty. + * @param metadata can be {@literal null}. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + default Mono store(Publisher content, @Nullable String filename, @Nullable Object metadata) { + return store(content, filename, null, metadata); + } + + /** + * Stores the given content into a file with the given name and content type using the given metadata. The metadata + * object will be marshalled before writing. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null} + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata); + + /** + * Stores the given content into a file with the given name and content type using the given metadata. The metadata + * object will be marshalled before writing. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null} + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata); + + /** + * Stores the given content into a file with the given name using the given metadata. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param metadata can be {@literal null}. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + default Mono store(Publisher content, @Nullable String filename, @Nullable Document metadata) { + return store(content, filename, null, metadata); + } + + /** + * Stores the given content into a file with the given name and content type using the given metadata. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null}. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata); + + Mono store(Publisher content, String filename, String contentType, Document metadata); + + /** + * Returns all files matching the given query. Note, that currently {@link Sort} criterias defined at the + * {@link Query} will not be regarded as MongoDB does not support ordering for GridFS file access. + * + * @see MongoDB Jira: JAVA-431 + * @param query must not be {@literal null}. + * @return {@link GridFSFindIterable} to obtain results from. Eg. by calling + * {@link GridFSFindIterable#into(java.util.Collection)}. + */ + Flux find(Query query); + + /** + * Returns a single {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given query or {@literal null} in + * case no file matches. + * + * @param query must not be {@literal null}. + * @return + */ + Mono findOne(Query query); + + /** + * Deletes all files matching the given {@link Query}. + * + * @param query must not be {@literal null}. + */ + Mono delete(Query query); + + /** + * Returns the {@link GridFsResource} with the given file name. + * + * @param filename must not be {@literal null}. + * @return the resource. Use {@link org.springframework.core.io.Resource#exists()} to check if the returned + * {@link GridFsResource} is actually present. + * @see ResourcePatternResolver#getResource(String) + */ + Mono getResource(String filename); + + /** + * Returns the {@link GridFsResource} for a {@link GridFSFile}. + * + * @param file must not be {@literal null}. + * @return the resource for the file. + */ + Mono getResource(GridFSFile file); + + /** + * Returns all {@link GridFsResource}s matching the given file name pattern. + * + * @param filenamePattern must not be {@literal null}. + * @return + */ + Flux getResources(String filenamePattern); +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java new file mode 100644 index 000000000..eb787cbd5 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java @@ -0,0 +1,180 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import reactor.core.publisher.Flux; + +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; + +import org.reactivestreams.Publisher; +import org.springframework.core.io.AbstractResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import com.mongodb.client.gridfs.model.GridFSFile; + +/** + * Reactive {@link GridFSFile} based {@link Resource} implementation. + * + * @author Mark Paluch + * @since 2.2 + */ +public class ReactiveGridFsResource extends AbstractResource { + + static final String CONTENT_TYPE_FIELD = "_contentType"; + private static final ByteArrayInputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]); + + private final @Nullable GridFSFile file; + private final String filename; + private final Flux content; + + /** + * Creates a new, absent {@link ReactiveGridFsResource}. + * + * @param filename filename of the absent resource. + * @param content + * @since 2.1 + */ + private ReactiveGridFsResource(String filename, Publisher content) { + + this.file = null; + this.filename = filename; + this.content = Flux.from(content); + } + + /** + * Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}. + * + * @param file must not be {@literal null}. + * @param content + */ + public ReactiveGridFsResource(GridFSFile file, Publisher content) { + + this.file = file; + this.filename = file.getFilename(); + this.content = Flux.from(content); + } + + /** + * Obtain an absent {@link ReactiveGridFsResource}. + * + * @param filename filename of the absent resource, must not be {@literal null}. + * @return never {@literal null}. + * @since 2.1 + */ + public static ReactiveGridFsResource absent(String filename) { + + Assert.notNull(filename, "Filename must not be null"); + + return new ReactiveGridFsResource(filename, Flux.empty()); + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.InputStreamResource#getInputStream() + */ + @Override + public InputStream getInputStream() throws IllegalStateException { + throw new UnsupportedOperationException(); + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#contentLength() + */ + @Override + public long contentLength() throws IOException { + + verifyExists(); + return file.getLength(); + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#getFilename() + */ + @Override + public String getFilename() throws IllegalStateException { + return filename; + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#exists() + */ + @Override + public boolean exists() { + return file != null; + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#lastModified() + */ + @Override + public long lastModified() throws IOException { + + verifyExists(); + return file.getUploadDate().getTime(); + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#getDescription() + */ + @Override + public String getDescription() { + return String.format("GridFs resource [%s]", this.getFilename()); + } + + /** + * Returns the {@link Resource}'s id. + * + * @return never {@literal null}. + * @throws IllegalStateException if the file does not {@link #exists()}. + */ + public Object getId() { + + Assert.state(exists(), () -> String.format("%s does not exist.", getDescription())); + + return file.getId(); + } + + /** + * Retrieve the download stream. + * + * @return + */ + public Flux getDownloadStream() { + + if (!exists()) { + return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription()))); + } + return content; + } + + private void verifyExists() throws FileNotFoundException { + + if (!exists()) { + throw new FileNotFoundException(String.format("%s does not exist.", getDescription())); + } + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java new file mode 100644 index 000000000..4f13ded27 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java @@ -0,0 +1,297 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import static org.springframework.data.mongodb.core.query.Query.*; +import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Optional; + +import org.bson.Document; +import org.bson.types.ObjectId; +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; +import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.mongodb.core.convert.QueryMapper; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +import com.mongodb.client.gridfs.model.GridFSFile; +import com.mongodb.client.gridfs.model.GridFSUploadOptions; +import com.mongodb.reactivestreams.client.MongoDatabase; +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; +import com.mongodb.reactivestreams.client.gridfs.GridFSBucket; +import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets; +import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadStream; +import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher; + +/** + * {@link ReactiveGridFsOperations} implementation to store content into MongoDB GridFS. Uses by default + * {@link DefaultDataBufferFactory} to create {@link DataBuffer buffers}. + * + * @author Mark Paluch + * @since 2.2 + */ +public class ReactiveGridFsTemplate implements ReactiveGridFsOperations { + + private final DataBufferFactory dataBufferFactory; + private final ReactiveMongoDatabaseFactory dbFactory; + private final @Nullable String bucket; + private final MongoConverter converter; + private final QueryMapper queryMapper; + + /** + * Creates a new {@link ReactiveGridFsTemplate} using the given {@link ReactiveMongoDatabaseFactory} and + * {@link MongoConverter}. + * + * @param dbFactory must not be {@literal null}. + * @param converter must not be {@literal null}. + */ + public ReactiveGridFsTemplate(ReactiveMongoDatabaseFactory dbFactory, MongoConverter converter) { + this(dbFactory, converter, null); + } + + /** + * Creates a new {@link ReactiveGridFsTemplate} using the given {@link ReactiveMongoDatabaseFactory} and + * {@link MongoConverter}. + * + * @param dbFactory must not be {@literal null}. + * @param converter must not be {@literal null}. + * @param bucket + */ + public ReactiveGridFsTemplate(ReactiveMongoDatabaseFactory dbFactory, MongoConverter converter, + @Nullable String bucket) { + this(new DefaultDataBufferFactory(), dbFactory, converter, bucket); + } + + /** + * Creates a new {@link ReactiveGridFsTemplate} using the given {@link DataBufferFactory}, + * {@link ReactiveMongoDatabaseFactory} and {@link MongoConverter}. + * + * @param dataBufferFactory must not be {@literal null}. + * @param dbFactory must not be {@literal null}. + * @param converter must not be {@literal null}. + * @param bucket + */ + public ReactiveGridFsTemplate(DataBufferFactory dataBufferFactory, ReactiveMongoDatabaseFactory dbFactory, + MongoConverter converter, @Nullable String bucket) { + + Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null!"); + Assert.notNull(dbFactory, "ReactiveMongoDatabaseFactory must not be null!"); + Assert.notNull(converter, "MongoConverter must not be null!"); + + this.dataBufferFactory = dataBufferFactory; + this.dbFactory = dbFactory; + this.converter = converter; + this.bucket = bucket; + + this.queryMapper = new QueryMapper(converter); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(com.mongodb.reactivestreams.client.gridfs.AsyncInputStream, java.lang.String, java.lang.String, java.lang.Object) + */ + @Override + public Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata) { + return store(content, filename, contentType, toDocument(metadata)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(org.reactivestreams.Publisher, java.lang.String, java.lang.String, java.lang.Object) + */ + @Override + public Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata) { + return store(content, filename, contentType, toDocument(metadata)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(com.mongodb.reactivestreams.client.gridfs.AsyncInputStream, java.lang.String, java.lang.String, org.bson.Document) + */ + @Override + public Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata) { + + Assert.notNull(content, "InputStream must not be null!"); + + GridFSUploadOptions options = new GridFSUploadOptions(); + + Document mData = new Document(); + + if (StringUtils.hasText(contentType)) { + mData.put(GridFsResource.CONTENT_TYPE_FIELD, contentType); + } + + if (metadata != null) { + mData.putAll(metadata); + } + + options.metadata(mData); + + return Mono.from(getGridFs().uploadFromStream(filename, content, options)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(org.reactivestreams.Publisher, java.lang.String, java.lang.String, org.bson.Document) + */ + @Override + public Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata) { + + Assert.notNull(content, "Content must not be null!"); + + return BinaryStreamAdapters.toAsyncInputStream(content).flatMap(it -> store(it, filename, contentType, metadata)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#find(org.springframework.data.mongodb.core.query.Query) + */ + @Override + public Flux find(Query query) { + + GridFSFindPublisher publisherToUse = prepareQuery(query); + + return Flux.from(publisherToUse); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#findOne(org.springframework.data.mongodb.core.query.Query) + */ + @Override + public Mono findOne(Query query) { + + GridFSFindPublisher publisherToUse = prepareQuery(query); + + return Flux.from(publisherToUse.limit(1)).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#delete(org.springframework.data.mongodb.core.query.Query) + */ + @Override + public Mono delete(Query query) { + + GridFSBucket gridFs = getGridFs(); + return find(query).flatMap(it -> gridFs.delete(it.getId())).then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#getResource(java.lang.String) + */ + @Override + public Mono getResource(String location) { + + Assert.notNull(location, "Filename must not be null!"); + + return findOne(query(whereFilename().is(location))).flatMap(this::getResource) + .defaultIfEmpty(ReactiveGridFsResource.absent(location)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#getResource(com.mongodb.client.gridfs.model.GridFSFile) + */ + @Override + public Mono getResource(GridFSFile file) { + + Assert.notNull(file, "GridFSFile must not be null!"); + + return Mono.fromSupplier(() -> { + + GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getObjectId()); + + return new ReactiveGridFsResource(file, BinaryStreamAdapters.toPublisher(stream, dataBufferFactory)); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#getResources(java.lang.String) + */ + @Override + public Flux getResources(String locationPattern) { + + if (!StringUtils.hasText(locationPattern)) { + return Flux.empty(); + } + + AntPath path = new AntPath(locationPattern); + + if (path.isPattern()) { + + Flux files = find(query(whereFilename().regex(path.toRegex()))); + return files.flatMap(this::getResource); + } + + return getResource(locationPattern).flux(); + } + + protected GridFSFindPublisher prepareQuery(Query query) { + + Assert.notNull(query, "Query must not be null!"); + + Document queryObject = getMappedQuery(query.getQueryObject()); + Document sortObject = getMappedQuery(query.getSortObject()); + + GridFSFindPublisher publisherToUse = getGridFs().find(queryObject).sort(sortObject); + + Integer cursorBatchSize = query.getMeta().getCursorBatchSize(); + if (cursorBatchSize != null) { + publisherToUse = publisherToUse.batchSize(cursorBatchSize); + } + + return publisherToUse; + } + + private Document getMappedQuery(Document query) { + return queryMapper.getMappedObject(query, Optional.empty()); + } + + protected GridFSBucket getGridFs() { + + MongoDatabase db = dbFactory.getMongoDatabase(); + return bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket); + } + + @Nullable + private Document toDocument(@Nullable Object metadata) { + + Document document = null; + + if (metadata != null) { + document = new Document(); + converter.write(metadata, document); + } + return document; + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java new file mode 100644 index 000000000..b94de93af --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java @@ -0,0 +1,102 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import static org.assertj.core.api.Assertions.*; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.util.StreamUtils; + +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; +import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper; + +/** + * Unit tests for {@link BinaryStreamAdapters}. + * + * @author Mark Paluch + */ +public class BinaryStreamAdaptersUnitTests { + + @Test // DATAMONGO-1855 + public void shouldAdaptAsyncInputStreamToDataBufferPublisher() throws IOException { + + ClassPathResource resource = new ClassPathResource("gridfs/gridfs.xml"); + + byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); + AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory()); + + DataBufferUtils.join(dataBuffers) // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + + byte[] actualContent = new byte[actual.readableByteCount()]; + actual.read(actualContent); + assertThat(actualContent).isEqualTo(content); + }) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void shouldAdaptBinaryPublisherToAsyncInputStream() throws IOException { + + ClassPathResource resource = new ClassPathResource("gridfs/gridfs.xml"); + + byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); + + Flux dataBuffers = DataBufferUtils.readInputStream(resource::getInputStream, + new DefaultDataBufferFactory(), 10); + + AsyncInputStream inputStream = BinaryStreamAdapters.toAsyncInputStream(dataBuffers).block(); + ByteBuffer complete = readBuffer(inputStream); + + assertThat(complete).isEqualTo(ByteBuffer.wrap(content)); + } + + static ByteBuffer readBuffer(AsyncInputStream inputStream) { + + ByteBuffer complete = ByteBuffer.allocate(1024); + + boolean hasData = true; + while (hasData) { + + ByteBuffer chunk = ByteBuffer.allocate(100); + + Integer bytesRead = Mono.from(inputStream.read(chunk)).block(); + + chunk.flip(); + complete.put(chunk); + + hasData = bytesRead > -1; + } + + complete.flip(); + + return complete; + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java new file mode 100644 index 000000000..dea663eb5 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java @@ -0,0 +1,140 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import static org.assertj.core.api.Assertions.*; +import static org.springframework.data.mongodb.core.query.Criteria.*; +import static org.springframework.data.mongodb.core.query.Query.*; +import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*; + +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.io.IOException; + +import org.bson.BsonObjectId; +import org.bson.Document; +import org.bson.types.ObjectId; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBuffer; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.StreamUtils; + +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; +import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper; + +/** + * @author Mark Paluch + */ +@RunWith(SpringRunner.class) +@ContextConfiguration("classpath:gridfs/reactive-gridfs.xml") +public class ReactiveGridFsTemplateIntegrationTests { + + Resource resource = new ClassPathResource("gridfs/gridfs.xml"); + + @Autowired ReactiveGridFsOperations operations; + + @Before + public void setUp() { + operations.delete(new Query()) // + .as(StepVerifier::create) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void storesAndFindsSimpleDocument() { + + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + DefaultDataBuffer first = factory.wrap("first".getBytes()); + DefaultDataBuffer second = factory.wrap("second".getBytes()); + + ObjectId reference = operations.store(Flux.just(first, second), "foo.xml").block(); + + operations.find(query(where("_id").is(reference))) // + .as(StepVerifier::create) // + .assertNext(actual -> { + assertThat(((BsonObjectId) actual.getId()).getValue()).isEqualTo(reference); + }).verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void writesMetadataCorrectly() throws IOException { + + Document metadata = new Document("key", "value"); + + AsyncInputStream stream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + ObjectId reference = operations.store(stream, "foo.xml", "binary/octet-stream", metadata).block(); + + operations.find(query(whereMetaData("key").is("value"))) // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + assertThat(actual.getObjectId()).isEqualTo(reference); + })// + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void marshalsComplexMetadata() throws IOException { + + Metadata metadata = new Metadata(); + metadata.version = "1.0"; + + AsyncInputStream stream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + ObjectId reference = operations.store(stream, "foo.xml", "binary/octet-stream", metadata).block(); + + operations.find(query(whereMetaData("version").is("1.0"))) // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + assertThat(actual.getObjectId()).isEqualTo(reference); + })// + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void getResourceShouldRetrieveContentByIdentity() throws IOException { + + byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); + AsyncInputStream upload = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + ObjectId reference = operations.store(upload, "foo.xml", null, null).block(); + + operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource) + .flatMapMany(ReactiveGridFsResource::getDownloadStream) // + .transform(DataBufferUtils::join).as(StepVerifier::create) // + .consumeNextWith(dataBuffer -> { + + byte[] actual = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(actual); + + assertThat(actual).isEqualTo(content); + }).verifyComplete(); + } + + static class Metadata { + String version; + } +} diff --git a/spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml b/spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml new file mode 100644 index 000000000..f07ee8b08 --- /dev/null +++ b/spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/asciidoc/new-features.adoc b/src/main/asciidoc/new-features.adoc index 8135ad698..80ec95981 100644 --- a/src/main/asciidoc/new-features.adoc +++ b/src/main/asciidoc/new-features.adoc @@ -5,6 +5,7 @@ == What's New in Spring Data MongoDB 2.2 * <> * <> via `ReactiveQuerydslPredicateExecutor`. +* <>. [[new-features.2-1-0]] == What's New in Spring Data MongoDB 2.1 diff --git a/src/main/asciidoc/reference/reactive-mongodb.adoc b/src/main/asciidoc/reference/reactive-mongodb.adoc index bb9ac80f3..eaa4a51f2 100644 --- a/src/main/asciidoc/reference/reactive-mongodb.adoc +++ b/src/main/asciidoc/reference/reactive-mongodb.adoc @@ -482,3 +482,96 @@ Flux hasIndex = operations.execute("geolocation", .flatMap(document -> Mono.just(true)) .defaultIfEmpty(false)); ---- + +[[reactive.gridfs]] +== GridFS Support + +MongoDB supports storing binary files inside its filesystem, GridFS. +Spring Data MongoDB provides a `ReactiveGridFsOperations` interface as well as the corresponding implementation, `ReactiveGridFsTemplate`, to let you interact with the filesystem. +You can set up a `ReactiveGridFsTemplate` instance by handing it a `ReactiveMongoDatabaseFactory` as well as a `MongoConverter`, as the following example shows: + +.JavaConfig setup for a ReactiveGridFsTemplate +==== +[source,java] +---- +class GridFsConfiguration extends AbstractReactiveMongoConfiguration { + + // … further configuration omitted + + @Bean + public ReactiveGridFsTemplate reactiveGridFsTemplate() { + return new ReactiveGridFsTemplate(reactiveMongoDbFactory(), mappingMongoConverter()); + } +} +---- +==== + +The template can now be injected and used to perform storage and retrieval operations, as the following example shows: + +.Using ReactiveGridFsTemplate to store files +==== +[source,java] +---- +class ReactiveGridFsClient { + + @Autowired + ReactiveGridFsTemplate operations; + + @Test + public Mono storeFileToGridFs() { + + FileMetadata metadata = new FileMetadata(); + // populate metadata + Publisher file = … // lookup File or Resource + + return operations.store(file, "filename.txt", metadata); + } +} +---- +==== + +The `store(…)` operations take an `Publisher`, a filename, and (optionally) metadata information about the file to store. The metadata can be an arbitrary object, which will be marshaled by the `MongoConverter` configured with the `ReactiveGridFsTemplate`. Alternatively, you can also provide a `Document`. + +NOTE: MongoDB's driver uses `AsyncInputStream` and `AsyncOutputStream` interfaces to exchange binary streams. Spring Data MongoDB adapts these interfaces to `Publisher`. Read more about `DataBuffer` in http://docs.spring.io/spring/docs/{springVersion}/spring-framework-reference/core.html#databuffers[Spring's reference documentation]. + +You can read files from the filesystem through either the `find(…)` or the `getResources(…)` methods. Let's have a look at the `find(…)` methods first. You can either find a single file or multiple files that match a `Query`. You can use the `GridFsCriteria` helper class to define queries. It provides static factory methods to encapsulate default metadata fields (such as `whereFilename()` and `whereContentType()`) or a custom one through `whereMetaData()`. The following example shows how to use `ReactiveGridFsTemplate` to query for files: + +.Using ReactiveGridFsTemplate to query for files +==== +[source,java] +---- +class ReactiveGridFsClient { + + @Autowired + ReactiveGridFsTemplate operations; + + @Test + public Flux findFilesInGridFs() { + return operations.find(query(whereFilename().is("filename.txt"))) + } +} +---- +==== + +NOTE: Currently, MongoDB does not support defining sort criteria when retrieving files from GridFS. For this reason, any sort criteria defined on the `Query` instance handed into the `find(…)` method are disregarded. + +The other option to read files from the GridFs is to use the methods modeled along the lines of `ResourcePatternResolver`. +`ReactiveGridFsOperations` uses reactive types to defer execution while `ResourcePatternResolver` uses a synchronous interface. +These methods allow handing an Ant path into the method and can thus retrieve files matching the given pattern. The following example shows how to use `ReactiveGridFsTemplate` to read files: + +.Using ReactiveGridFsTemplate to read files +==== +[source,java] +---- +class ReactiveGridFsClient { + + @Autowired + ReactiveGridFsOperations operations; + + @Test + public void readFilesFromGridFs() { + Flux txtFiles = operations.getResources("*.txt"); + } +} +---- +====