DATAMONGO-2505 - Polishing.
Simplify operators. Remove Lombok usage from newly introduced code. Update reference documentation. Reformat code. Original pull request: #854.
This commit is contained in:
@@ -113,8 +113,12 @@ public class SimpleReactiveMongoDatabaseFactory implements DisposableBean, React
|
||||
|
||||
Assert.hasText(dbName, "Database name must not be empty.");
|
||||
|
||||
return Mono.just(mongo.getDatabase(dbName))
|
||||
.map(db -> writeConcern != null ? db.withWriteConcern(writeConcern) : db);
|
||||
return Mono.fromSupplier(() -> {
|
||||
|
||||
MongoDatabase db = mongo.getDatabase(dbName);
|
||||
|
||||
return writeConcern != null ? db.withWriteConcern(writeConcern) : db;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -18,7 +18,6 @@ package org.springframework.data.mongodb.gridfs;
|
||||
import static org.springframework.data.mongodb.core.query.Query.*;
|
||||
import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@@ -28,6 +27,7 @@ import org.bson.BsonValue;
|
||||
import org.bson.Document;
|
||||
import org.bson.types.ObjectId;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
@@ -137,12 +137,13 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
|
||||
String filename = upload.getFilename();
|
||||
Flux<ByteBuffer> source = Flux.from(upload.getContent()).map(DataBuffer::asByteBuffer);
|
||||
T fileId = upload.getFileId();
|
||||
|
||||
if (fileId == null) {
|
||||
return (Mono<T>) createMono(new AutoIdCreatingUploadCallback(filename, source, uploadOptions));
|
||||
}
|
||||
|
||||
UploadCallback callback = new UploadCallback(BsonUtils.simpleToBsonValue(fileId), filename, source, uploadOptions);
|
||||
return createMono(callback).then(Mono.just(fileId));
|
||||
return createMono(callback).thenReturn(fileId);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -170,17 +171,17 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
|
||||
|
||||
return createFlux(new FindLimitCallback(query, queryObject, sortObject, 2)) //
|
||||
.collectList() //
|
||||
.flatMap(it -> {
|
||||
if (it.isEmpty()) {
|
||||
return Mono.empty();
|
||||
.handle((files, sink) -> {
|
||||
|
||||
if (files.size() == 1) {
|
||||
sink.next(files.get(0));
|
||||
return;
|
||||
}
|
||||
|
||||
if (it.size() > 1) {
|
||||
return Mono.error(new IncorrectResultSizeDataAccessException(
|
||||
if (files.size() > 1) {
|
||||
sink.error(new IncorrectResultSizeDataAccessException(
|
||||
"Query " + SerializationUtils.serializeToJsonSafely(query) + " returned non unique result.", 1));
|
||||
}
|
||||
|
||||
return Mono.just(it.get(0));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -228,7 +229,7 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
|
||||
|
||||
Assert.notNull(file, "GridFSFile must not be null!");
|
||||
|
||||
return this.doGetBucket()
|
||||
return doGetBucket()
|
||||
.map(it -> new ReactiveGridFsResource(file, it.downloadToPublisher(file.getId()), dataBufferFactory));
|
||||
}
|
||||
|
||||
@@ -265,7 +266,7 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
|
||||
|
||||
Assert.notNull(callback, "ReactiveBucketCallback must not be null!");
|
||||
|
||||
return Mono.defer(this::doGetBucket).flatMap(bucket -> Mono.from(callback.doInBucket(bucket)));
|
||||
return doGetBucket().flatMap(bucket -> Mono.from(callback.doInBucket(bucket)));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -279,7 +280,7 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
|
||||
|
||||
Assert.notNull(callback, "ReactiveBucketCallback must not be null!");
|
||||
|
||||
return Mono.defer(this::doGetBucket).flatMapMany(callback::doInBucket);
|
||||
return doGetBucket().flatMapMany(callback::doInBucket);
|
||||
}
|
||||
|
||||
protected Mono<GridFSBucket> doGetBucket() {
|
||||
@@ -287,29 +288,45 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
|
||||
.map(db -> bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param <T>
|
||||
* @author Mathieu Ouellet
|
||||
* @since 3.0
|
||||
*/
|
||||
interface ReactiveBucketCallback<T> {
|
||||
Publisher<T> doInBucket(GridFSBucket bucket);
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private static class FindCallback implements ReactiveBucketCallback<GridFSFile> {
|
||||
|
||||
private final Query query;
|
||||
private final Document queryObject;
|
||||
private final Document sortObject;
|
||||
|
||||
public FindCallback(Query query, Document queryObject, Document sortObject) {
|
||||
|
||||
this.query = query;
|
||||
this.queryObject = queryObject;
|
||||
this.sortObject = sortObject;
|
||||
}
|
||||
|
||||
public GridFSFindPublisher doInBucket(GridFSBucket bucket) {
|
||||
|
||||
GridFSFindPublisher findPublisher = bucket.find(queryObject).sort(sortObject);
|
||||
|
||||
if (query.getLimit() > 0) {
|
||||
findPublisher = findPublisher.limit(query.getLimit());
|
||||
}
|
||||
|
||||
if (query.getSkip() > 0) {
|
||||
findPublisher = findPublisher.skip(Math.toIntExact(query.getSkip()));
|
||||
}
|
||||
|
||||
Integer cursorBatchSize = query.getMeta().getCursorBatchSize();
|
||||
if (cursorBatchSize != null) {
|
||||
findPublisher = findPublisher.batchSize(cursorBatchSize);
|
||||
}
|
||||
|
||||
return findPublisher;
|
||||
}
|
||||
}
|
||||
@@ -319,6 +336,7 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
|
||||
private final int limit;
|
||||
|
||||
public FindLimitCallback(Query query, Document queryObject, Document sortObject, int limit) {
|
||||
|
||||
super(query, queryObject, sortObject);
|
||||
this.limit = limit;
|
||||
}
|
||||
@@ -329,7 +347,6 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
|
||||
}
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private static class UploadCallback implements ReactiveBucketCallback<Void> {
|
||||
|
||||
private final BsonValue fileId;
|
||||
@@ -337,30 +354,49 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
|
||||
private final Publisher<ByteBuffer> source;
|
||||
private final GridFSUploadOptions uploadOptions;
|
||||
|
||||
public UploadCallback(BsonValue fileId, String filename, Publisher<ByteBuffer> source,
|
||||
GridFSUploadOptions uploadOptions) {
|
||||
|
||||
this.fileId = fileId;
|
||||
this.filename = filename;
|
||||
this.source = source;
|
||||
this.uploadOptions = uploadOptions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GridFSUploadPublisher<Void> doInBucket(GridFSBucket bucket) {
|
||||
return bucket.uploadFromPublisher(fileId, filename, source, uploadOptions);
|
||||
}
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private static class AutoIdCreatingUploadCallback implements ReactiveBucketCallback<ObjectId> {
|
||||
|
||||
private final String filename;
|
||||
private final Publisher<ByteBuffer> source;
|
||||
private final GridFSUploadOptions uploadOptions;
|
||||
|
||||
public AutoIdCreatingUploadCallback(String filename, Publisher<ByteBuffer> source,
|
||||
GridFSUploadOptions uploadOptions) {
|
||||
|
||||
this.filename = filename;
|
||||
this.source = source;
|
||||
this.uploadOptions = uploadOptions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GridFSUploadPublisher<ObjectId> doInBucket(GridFSBucket bucket) {
|
||||
return bucket.uploadFromPublisher(filename, source, uploadOptions);
|
||||
}
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private static class DeleteCallback implements ReactiveBucketCallback<Void> {
|
||||
|
||||
private final BsonValue id;
|
||||
|
||||
public DeleteCallback(BsonValue id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Publisher<Void> doInBucket(GridFSBucket bucket) {
|
||||
return bucket.delete(id);
|
||||
|
||||
@@ -3,7 +3,8 @@
|
||||
|
||||
Spring Data MongoDB 3.x requires the MongoDB Java Driver 4.x. +
|
||||
The 4.0 MongoDB Java Driver does no longer support certain features that have already been deprecated in one of the last minor versions.
|
||||
Some of the changes affect the initial setup configuration as well as compile/runtime features. We summarized the most typical changes one might encounter.
|
||||
Some of the changes affect the initial setup configuration as well as compile/runtime features.
|
||||
We summarized the most typical changes one might encounter.
|
||||
|
||||
== Dependency Changes
|
||||
|
||||
@@ -216,3 +217,9 @@ public class Config extends AbstractMongoClientConfiguration {
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
=== Deferred MongoDatabase lookup in `ReactiveMongoDatabaseFactory`
|
||||
|
||||
`ReactiveMongoDatabaseFactory` now returns `Mono<MongoDatabase>` instead of `MongoDatabase` to allow access to the Reactor Subscriber context to enable context-specific routing functionality.
|
||||
|
||||
This change affects `ReactiveMongoTemplate.getMongoDatabase()` and `ReactiveMongoTemplate.getCollection()` so both methods must follow deferred retrieval.
|
||||
|
||||
Reference in New Issue
Block a user