Compare commits
23 Commits
labs/manua
...
issue/4306
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
38e406867a | ||
|
|
5305c9542b | ||
|
|
c02968d00a | ||
|
|
3ab78fc1ed | ||
|
|
fa0f026410 | ||
|
|
9c96a2b2c3 | ||
|
|
0986210221 | ||
|
|
7d5372f049 | ||
|
|
a5022e9bc4 | ||
|
|
aff8fbd62a | ||
|
|
633fbceb5a | ||
|
|
fb9a0d8482 | ||
|
|
d73807df1b | ||
|
|
e56f6ce87f | ||
|
|
c5c6fc107c | ||
|
|
368c644922 | ||
|
|
4d050f5021 | ||
|
|
83923e0e2a | ||
|
|
25588850dd | ||
|
|
55c81f4f54 | ||
|
|
ac7551e47f | ||
|
|
6d3043de9a | ||
|
|
1a94b6e4ee |
4
.mvn/wrapper/maven-wrapper.properties
vendored
4
.mvn/wrapper/maven-wrapper.properties
vendored
@@ -1,2 +1,2 @@
|
||||
#Fri Jun 03 09:32:40 CEST 2022
|
||||
distributionUrl=https\://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.5/apache-maven-3.8.5-bin.zip
|
||||
#Mon Feb 20 11:58:01 CET 2023
|
||||
distributionUrl=https\://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.0/apache-maven-3.9.0-bin.zip
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
# Java versions
|
||||
java.main.tag=17.0.5_8-jdk-focal
|
||||
java.main.tag=17.0.6_10-jdk-focal
|
||||
|
||||
# Docker container images - standard
|
||||
docker.java.main.image=harbor-repo.vmware.com/dockerhub-proxy-cache/library/eclipse-temurin:${java.main.tag}
|
||||
|
||||
# Supported versions of MongoDB
|
||||
docker.mongodb.4.4.version=4.4.17
|
||||
docker.mongodb.5.0.version=5.0.13
|
||||
docker.mongodb.6.0.version=6.0.2
|
||||
docker.mongodb.4.4.version=4.4.18
|
||||
docker.mongodb.5.0.version=5.0.14
|
||||
docker.mongodb.6.0.version=6.0.4
|
||||
|
||||
# Supported versions of Redis
|
||||
docker.redis.6.version=6.2.6
|
||||
docker.redis.6.version=6.2.10
|
||||
|
||||
# Supported versions of Cassandra
|
||||
docker.cassandra.3.version=3.11.14
|
||||
|
||||
4
pom.xml
4
pom.xml
@@ -5,7 +5,7 @@
|
||||
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-mongodb-parent</artifactId>
|
||||
<version>4.1.0-SNAPSHOT</version>
|
||||
<version>4.1.x-GH-4306-SNAPSHOT</version>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<name>Spring Data MongoDB</name>
|
||||
@@ -27,7 +27,7 @@
|
||||
<project.type>multi</project.type>
|
||||
<dist.id>spring-data-mongodb</dist.id>
|
||||
<springdata.commons>3.1.0-SNAPSHOT</springdata.commons>
|
||||
<mongo>4.8.2</mongo>
|
||||
<mongo>4.9.0</mongo>
|
||||
<mongo.reactivestreams>${mongo}</mongo.reactivestreams>
|
||||
<jmh.version>1.19</jmh.version>
|
||||
</properties>
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-mongodb-parent</artifactId>
|
||||
<version>4.1.0-SNAPSHOT</version>
|
||||
<version>4.1.x-GH-4306-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-mongodb-parent</artifactId>
|
||||
<version>4.1.0-SNAPSHOT</version>
|
||||
<version>4.1.x-GH-4306-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-mongodb-parent</artifactId>
|
||||
<version>4.1.0-SNAPSHOT</version>
|
||||
<version>4.1.x-GH-4306-SNAPSHOT</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core;
|
||||
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import com.mongodb.client.MongoCollection;
|
||||
|
||||
/**
|
||||
* Interface for functional preparation of a {@link MongoCollection}.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @since 4.1
|
||||
*/
|
||||
public interface CollectionPreparer<T> {
|
||||
|
||||
/**
|
||||
* Returns a preparer that always returns its input collection.
|
||||
*
|
||||
* @return a preparer that always returns its input collection.
|
||||
*/
|
||||
static <T> CollectionPreparer<T> identity() {
|
||||
return it -> it;
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare the {@code collection}.
|
||||
*
|
||||
* @param collection the collection to prepare.
|
||||
* @return the prepared collection.
|
||||
*/
|
||||
T prepare(T collection);
|
||||
|
||||
/**
|
||||
* Returns a composed {@code CollectionPreparer} that first applies this preparer to the collection, and then applies
|
||||
* the {@code after} preparer to the result. If evaluation of either function throws an exception, it is relayed to
|
||||
* the caller of the composed function.
|
||||
*
|
||||
* @param after the collection preparer to apply after this function is applied.
|
||||
* @return a composed {@code CollectionPreparer} that first applies this preparer and then applies the {@code after}
|
||||
* preparer.
|
||||
*/
|
||||
default CollectionPreparer<T> andThen(CollectionPreparer<T> after) {
|
||||
Assert.notNull(after, "After CollectionPreparer must not be null");
|
||||
return c -> after.prepare(prepare(c));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,182 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.bson.Document;
|
||||
|
||||
import com.mongodb.ReadConcern;
|
||||
import com.mongodb.ReadPreference;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
|
||||
/**
|
||||
* Support class for delegate implementations to apply {@link ReadConcern} and {@link ReadPreference} settings upon
|
||||
* {@link CollectionPreparer preparing a collection}.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @since 4.1
|
||||
*/
|
||||
class CollectionPreparerSupport implements ReadConcernAware, ReadPreferenceAware {
|
||||
|
||||
private final List<Object> sources;
|
||||
|
||||
private CollectionPreparerSupport(List<Object> sources) {
|
||||
this.sources = sources;
|
||||
}
|
||||
|
||||
<T> T doPrepare(T collection, Function<T, ReadConcern> concernAccessor, BiFunction<T, ReadConcern, T> concernFunction,
|
||||
Function<T, ReadPreference> preferenceAccessor, BiFunction<T, ReadPreference, T> preferenceFunction) {
|
||||
|
||||
T collectionToUse = collection;
|
||||
|
||||
for (Object source : sources) {
|
||||
if (source instanceof ReadConcernAware rca && rca.hasReadConcern()) {
|
||||
|
||||
ReadConcern concern = rca.getReadConcern();
|
||||
if (concernAccessor.apply(collectionToUse) != concern) {
|
||||
collectionToUse = concernFunction.apply(collectionToUse, concern);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (Object source : sources) {
|
||||
if (source instanceof ReadPreferenceAware rpa && rpa.hasReadPreference()) {
|
||||
|
||||
ReadPreference preference = rpa.getReadPreference();
|
||||
if (preferenceAccessor.apply(collectionToUse) != preference) {
|
||||
collectionToUse = preferenceFunction.apply(collectionToUse, preference);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return collectionToUse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasReadConcern() {
|
||||
|
||||
for (Object aware : sources) {
|
||||
if (aware instanceof ReadConcernAware rca && rca.hasReadConcern()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadConcern getReadConcern() {
|
||||
|
||||
for (Object aware : sources) {
|
||||
if (aware instanceof ReadConcernAware rca && rca.hasReadConcern()) {
|
||||
return rca.getReadConcern();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasReadPreference() {
|
||||
|
||||
for (Object aware : sources) {
|
||||
if (aware instanceof ReadPreferenceAware rpa && rpa.hasReadPreference()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadPreference getReadPreference() {
|
||||
|
||||
for (Object aware : sources) {
|
||||
if (aware instanceof ReadPreferenceAware rpa && rpa.hasReadPreference()) {
|
||||
return rpa.getReadPreference();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
static class CollectionPreparerDelegate extends CollectionPreparerSupport
|
||||
implements CollectionPreparer<MongoCollection<Document>> {
|
||||
|
||||
private CollectionPreparerDelegate(List<Object> sources) {
|
||||
super(sources);
|
||||
}
|
||||
|
||||
public static CollectionPreparerDelegate of(ReadPreferenceAware... awares) {
|
||||
return of((Object[]) awares);
|
||||
}
|
||||
|
||||
public static CollectionPreparerDelegate of(Object... mixedAwares) {
|
||||
|
||||
if (mixedAwares.length == 1 && mixedAwares[0] instanceof CollectionPreparerDelegate) {
|
||||
return (CollectionPreparerDelegate) mixedAwares[0];
|
||||
}
|
||||
|
||||
return new CollectionPreparerDelegate(Arrays.asList(mixedAwares));
|
||||
}
|
||||
|
||||
@Override
|
||||
public MongoCollection<Document> prepare(MongoCollection<Document> collection) {
|
||||
return doPrepare(collection, MongoCollection::getReadConcern, MongoCollection::withReadConcern,
|
||||
MongoCollection::getReadPreference, MongoCollection::withReadPreference);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class ReactiveCollectionPreparerDelegate extends CollectionPreparerSupport
|
||||
implements CollectionPreparer<com.mongodb.reactivestreams.client.MongoCollection<Document>> {
|
||||
|
||||
private ReactiveCollectionPreparerDelegate(List<Object> sources) {
|
||||
super(sources);
|
||||
}
|
||||
|
||||
public static ReactiveCollectionPreparerDelegate of(ReadPreferenceAware... awares) {
|
||||
return of((Object[]) awares);
|
||||
}
|
||||
|
||||
public static ReactiveCollectionPreparerDelegate of(Object... mixedAwares) {
|
||||
|
||||
if (mixedAwares.length == 1 && mixedAwares[0] instanceof CollectionPreparerDelegate) {
|
||||
return (ReactiveCollectionPreparerDelegate) mixedAwares[0];
|
||||
}
|
||||
|
||||
return new ReactiveCollectionPreparerDelegate(Arrays.asList(mixedAwares));
|
||||
}
|
||||
|
||||
@Override
|
||||
public com.mongodb.reactivestreams.client.MongoCollection<Document> prepare(
|
||||
com.mongodb.reactivestreams.client.MongoCollection<Document> collection) {
|
||||
return doPrepare(collection, //
|
||||
com.mongodb.reactivestreams.client.MongoCollection::getReadConcern,
|
||||
com.mongodb.reactivestreams.client.MongoCollection::withReadConcern,
|
||||
com.mongodb.reactivestreams.client.MongoCollection::getReadPreference,
|
||||
com.mongodb.reactivestreams.client.MongoCollection::withReadPreference);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -20,7 +20,6 @@ import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.bson.Document;
|
||||
|
||||
import org.springframework.dao.IncorrectResultSizeDataAccessException;
|
||||
import org.springframework.data.mongodb.core.query.NearQuery;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
@@ -168,7 +167,8 @@ class ExecutableFindOperationSupport implements ExecutableFindOperation {
|
||||
Document queryObject = query.getQueryObject();
|
||||
Document fieldsObject = query.getFieldsObject();
|
||||
|
||||
return template.doFind(getCollectionName(), queryObject, fieldsObject, domainType, returnType,
|
||||
return template.doFind(template.createDelegate(query), getCollectionName(), queryObject, fieldsObject, domainType,
|
||||
returnType,
|
||||
getCursorPreparer(query, preparer));
|
||||
}
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
|
||||
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
|
||||
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
|
||||
@@ -296,6 +297,19 @@ public interface MongoOperations extends FluentMongoOperations {
|
||||
return createView(name, source, AggregationPipeline.of(stages));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
|
||||
* on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
|
||||
*
|
||||
* @param name the name of the view to create.
|
||||
* @param source the type defining the views source collection.
|
||||
* @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
|
||||
* @since 4.1
|
||||
*/
|
||||
default MongoCollection<Document> createView(String name, Class<?> source, AggregationStage... stages) {
|
||||
return createView(name, source, AggregationPipeline.of(stages));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
|
||||
* another collection or view identified by the given {@link #getCollectionName(Class) source type}.
|
||||
|
||||
@@ -55,6 +55,7 @@ import org.springframework.data.mongodb.MongoDatabaseFactory;
|
||||
import org.springframework.data.mongodb.MongoDatabaseUtils;
|
||||
import org.springframework.data.mongodb.SessionSynchronization;
|
||||
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
|
||||
import org.springframework.data.mongodb.core.CollectionPreparerSupport.CollectionPreparerDelegate;
|
||||
import org.springframework.data.mongodb.core.DefaultBulkOperations.BulkOperationContext;
|
||||
import org.springframework.data.mongodb.core.EntityOperations.AdaptibleEntity;
|
||||
import org.springframework.data.mongodb.core.QueryOperations.AggregationDefinition;
|
||||
@@ -66,6 +67,7 @@ import org.springframework.data.mongodb.core.QueryOperations.UpdateContext;
|
||||
import org.springframework.data.mongodb.core.aggregation.Aggregation;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOptions.Builder;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
|
||||
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
|
||||
@@ -83,7 +85,6 @@ import org.springframework.data.mongodb.core.mapreduce.MapReduceResults;
|
||||
import org.springframework.data.mongodb.core.query.BasicQuery;
|
||||
import org.springframework.data.mongodb.core.query.Collation;
|
||||
import org.springframework.data.mongodb.core.query.Meta;
|
||||
import org.springframework.data.mongodb.core.query.Meta.CursorOption;
|
||||
import org.springframework.data.mongodb.core.query.NearQuery;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition;
|
||||
@@ -112,7 +113,23 @@ import com.mongodb.client.result.DeleteResult;
|
||||
import com.mongodb.client.result.UpdateResult;
|
||||
|
||||
/**
|
||||
* Primary implementation of {@link MongoOperations}.
|
||||
* Primary implementation of {@link MongoOperations}. It simplifies the use of imperative MongoDB usage and helps to
|
||||
* avoid common errors. It executes core MongoDB workflow, leaving application code to provide {@link Document} and
|
||||
* extract results. This class executes BSON queries or updates, initiating iteration over {@link FindIterable} and
|
||||
* catching MongoDB exceptions and translating them to the generic, more informative exception hierarchy defined in the
|
||||
* org.springframework.dao package. Can be used within a service implementation via direct instantiation with a
|
||||
* {@link MongoDatabaseFactory} reference, or get prepared in an application context and given to services as bean
|
||||
* reference.
|
||||
* <p>
|
||||
* Note: The {@link MongoDatabaseFactory} should always be configured as a bean in the application context, in the first
|
||||
* case given to the service directly, in the second case to the prepared template.
|
||||
* <h3>{@link ReadPreference} and {@link com.mongodb.ReadConcern}</h3>
|
||||
* <p>
|
||||
* {@code ReadPreference} and {@code ReadConcern} are generally considered from {@link Query} and
|
||||
* {@link AggregationOptions} objects for the action to be executed on a particular {@link MongoCollection}.
|
||||
* <p>
|
||||
* You can also set the default {@link #setReadPreference(ReadPreference) ReadPreference} on the template level to
|
||||
* generally apply a {@link ReadPreference}.
|
||||
*
|
||||
* @author Thomas Risberg
|
||||
* @author Graeme Rocher
|
||||
@@ -141,7 +158,8 @@ import com.mongodb.client.result.UpdateResult;
|
||||
* @author Bartłomiej Mazur
|
||||
* @author Michael Krog
|
||||
*/
|
||||
public class MongoTemplate implements MongoOperations, ApplicationContextAware, IndexOperationsProvider {
|
||||
public class MongoTemplate
|
||||
implements MongoOperations, ApplicationContextAware, IndexOperationsProvider, ReadPreferenceAware {
|
||||
|
||||
private static final Log LOGGER = LogFactory.getLog(MongoTemplate.class);
|
||||
private static final WriteResultChecking DEFAULT_WRITE_RESULT_CHECKING = WriteResultChecking.NONE;
|
||||
@@ -293,6 +311,16 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
this.readPreference = readPreference;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasReadPreference() {
|
||||
return this.readPreference != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadPreference getReadPreference() {
|
||||
return this.readPreference;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configure whether lifecycle events such as {@link AfterLoadEvent}, {@link BeforeSaveEvent}, etc. should be
|
||||
* published or whether emission should be suppressed. Enabled by default.
|
||||
@@ -363,10 +391,10 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
|
||||
if (enabled) {
|
||||
|
||||
this.countExecution = (collectionName, filter, options) -> {
|
||||
this.countExecution = (collectionPreparer, collectionName, filter, options) -> {
|
||||
|
||||
if (!estimationFilter.test(filter, options)) {
|
||||
return doExactCount(collectionName, filter, options);
|
||||
return doExactCount(collectionPreparer, collectionName, filter, options);
|
||||
}
|
||||
|
||||
EstimatedDocumentCountOptions estimatedDocumentCountOptions = new EstimatedDocumentCountOptions();
|
||||
@@ -374,7 +402,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
estimatedDocumentCountOptions.maxTime(options.getMaxTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
return doEstimatedCount(collectionName, estimatedDocumentCountOptions);
|
||||
return doEstimatedCount(collectionPreparer, collectionName, estimatedDocumentCountOptions);
|
||||
};
|
||||
} else {
|
||||
this.countExecution = this::doExactCount;
|
||||
@@ -443,8 +471,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
Document mappedQuery = queryContext.getMappedQuery(persistentEntity);
|
||||
Document mappedFields = queryContext.getMappedFields(persistentEntity, projection);
|
||||
|
||||
CollectionPreparerDelegate readPreference = createDelegate(query);
|
||||
FindIterable<Document> cursor = new QueryCursorPreparer(query, entityType).initiateFind(collection,
|
||||
col -> col.find(mappedQuery, Document.class).projection(mappedFields));
|
||||
col -> readPreference.prepare(col).find(mappedQuery, Document.class).projection(mappedFields));
|
||||
|
||||
return new CloseableIterableCursorAdapter<>(cursor, exceptionTranslator,
|
||||
new ProjectingReadCallback<>(mongoConverter, projection, collectionName)).stream();
|
||||
@@ -517,7 +546,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
serializeToJsonSafely(queryObject), sortObject, fieldsObject, collectionName));
|
||||
}
|
||||
|
||||
this.executeQueryInternal(new FindCallback(queryObject, fieldsObject, null),
|
||||
this.executeQueryInternal(new FindCallback(createDelegate(query), queryObject, fieldsObject, null),
|
||||
preparer != null ? preparer : CursorPreparer.NO_OP_PREPARER, documentCallbackHandler, collectionName);
|
||||
}
|
||||
|
||||
@@ -619,7 +648,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
@Nullable ViewOptions options) {
|
||||
|
||||
return createView(name, getCollectionName(source),
|
||||
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
|
||||
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
|
||||
options);
|
||||
}
|
||||
|
||||
@@ -628,7 +657,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
@Nullable ViewOptions options) {
|
||||
|
||||
return createView(name, source,
|
||||
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
|
||||
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
|
||||
options);
|
||||
}
|
||||
|
||||
@@ -765,7 +794,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
|
||||
if (ObjectUtils.isEmpty(query.getSortObject())) {
|
||||
|
||||
return doFindOne(collectionName, query.getQueryObject(), query.getFieldsObject(),
|
||||
return doFindOne(collectionName, createDelegate(query), query.getQueryObject(), query.getFieldsObject(),
|
||||
new QueryCursorPreparer(query, entityClass), entityClass);
|
||||
} else {
|
||||
query.limit(1);
|
||||
@@ -797,7 +826,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
Document mappedQuery = queryContext.getMappedQuery(entityClass, this::getPersistentEntity);
|
||||
|
||||
return execute(collectionName,
|
||||
new ExistsCallback(mappedQuery, queryContext.getCollation(entityClass).orElse(null)));
|
||||
new ExistsCallback(createDelegate(query), mappedQuery, queryContext.getCollation(entityClass).orElse(null)));
|
||||
}
|
||||
|
||||
// Find methods that take a Query to express the query and that return a List of objects.
|
||||
@@ -814,7 +843,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
Assert.notNull(collectionName, "CollectionName must not be null");
|
||||
Assert.notNull(entityClass, "EntityClass must not be null");
|
||||
|
||||
return doFind(collectionName, query.getQueryObject(), query.getFieldsObject(), entityClass,
|
||||
return doFind(collectionName, createDelegate(query), query.getQueryObject(), query.getFieldsObject(), entityClass,
|
||||
new QueryCursorPreparer(query, entityClass));
|
||||
}
|
||||
|
||||
@@ -834,7 +863,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
|
||||
String idKey = operations.getIdPropertyName(entityClass);
|
||||
|
||||
return doFindOne(collectionName, new Document(idKey, id), new Document(), entityClass);
|
||||
return doFindOne(collectionName, CollectionPreparer.identity(), new Document(idKey, id), new Document(),
|
||||
entityClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -867,10 +897,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
serializeToJsonSafely(mappedQuery), field, collectionName));
|
||||
}
|
||||
|
||||
QueryCursorPreparer preparer = new QueryCursorPreparer(query, entityClass);
|
||||
if (preparer.hasReadPreference()) {
|
||||
collection = collection.withReadPreference(preparer.getReadPreference());
|
||||
}
|
||||
collection = createDelegate(query).prepare(collection);
|
||||
|
||||
DistinctIterable<T> iterable = collection.distinct(mappedFieldName, mappedQuery, mongoDriverCompatibleType);
|
||||
distinctQueryContext.applyCollation(entityClass, iterable::collation);
|
||||
@@ -920,8 +947,18 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
String collection = StringUtils.hasText(collectionName) ? collectionName : getCollectionName(domainType);
|
||||
String distanceField = operations.nearQueryDistanceFieldName(domainType);
|
||||
|
||||
Builder optionsBuilder = AggregationOptions.builder().collation(near.getCollation());
|
||||
|
||||
if (near.hasReadPreference()) {
|
||||
optionsBuilder.readPreference(near.getReadPreference());
|
||||
}
|
||||
|
||||
if(near.hasReadConcern()) {
|
||||
optionsBuilder.readConcern(near.getReadConcern());
|
||||
}
|
||||
|
||||
Aggregation $geoNear = TypedAggregation.newAggregation(domainType, Aggregation.geoNear(near, distanceField))
|
||||
.withOptions(AggregationOptions.builder().collation(near.getCollation()).build());
|
||||
.withOptions(optionsBuilder.build());
|
||||
|
||||
AggregationResults<Document> results = aggregate($geoNear, collection, Document.class);
|
||||
EntityProjection<T, ?> projection = operations.introspectProjection(returnType, domainType);
|
||||
@@ -986,7 +1023,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
operations.forType(entityClass).getCollation(query).ifPresent(optionsToUse::collation);
|
||||
}
|
||||
|
||||
return doFindAndModify(collectionName, query.getQueryObject(), query.getFieldsObject(),
|
||||
return doFindAndModify(createDelegate(query), collectionName, query.getQueryObject(), query.getFieldsObject(),
|
||||
getMappedSortObject(query, entityClass), entityClass, update, optionsToUse);
|
||||
}
|
||||
|
||||
@@ -1008,6 +1045,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
QueryContext queryContext = queryOperations.createQueryContext(query);
|
||||
|
||||
EntityProjection<T, S> projection = operations.introspectProjection(resultType, entityType);
|
||||
CollectionPreparerDelegate collectionPreparer = createDelegate(query);
|
||||
Document mappedQuery = queryContext.getMappedQuery(entity);
|
||||
Document mappedFields = queryContext.getMappedFields(entity, projection);
|
||||
Document mappedSort = queryContext.getMappedSort(entity);
|
||||
@@ -1018,7 +1056,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
maybeEmitEvent(new BeforeSaveEvent<>(replacement, mappedReplacement, collectionName));
|
||||
maybeCallBeforeSave(replacement, mappedReplacement, collectionName);
|
||||
|
||||
T saved = doFindAndReplace(collectionName, mappedQuery, mappedFields, mappedSort,
|
||||
T saved = doFindAndReplace(collectionPreparer, collectionName, mappedQuery, mappedFields, mappedSort,
|
||||
queryContext.getCollation(entityType).orElse(null), entityType, mappedReplacement, options, projection);
|
||||
|
||||
if (saved != null) {
|
||||
@@ -1046,7 +1084,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
Assert.notNull(entityClass, "EntityClass must not be null");
|
||||
Assert.notNull(collectionName, "CollectionName must not be null");
|
||||
|
||||
return doFindAndRemove(collectionName, query.getQueryObject(), query.getFieldsObject(),
|
||||
return doFindAndRemove(createDelegate(query), collectionName, query.getQueryObject(), query.getFieldsObject(),
|
||||
getMappedSortObject(query, entityClass), operations.forType(entityClass).getCollation(query).orElse(null),
|
||||
entityClass);
|
||||
}
|
||||
@@ -1078,17 +1116,19 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
CountOptions options = countContext.getCountOptions(entityClass);
|
||||
Document mappedQuery = countContext.getMappedQuery(entityClass, mappingContext::getPersistentEntity);
|
||||
|
||||
return doCount(collectionName, mappedQuery, options);
|
||||
CollectionPreparerDelegate readPreference = createDelegate(query);
|
||||
return doCount(readPreference, collectionName, mappedQuery, options);
|
||||
}
|
||||
|
||||
protected long doCount(String collectionName, Document filter, CountOptions options) {
|
||||
protected long doCount(CollectionPreparer collectionPreparer, String collectionName, Document filter,
|
||||
CountOptions options) {
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER
|
||||
.debug(String.format("Executing count: %s in collection: %s", serializeToJsonSafely(filter), collectionName));
|
||||
}
|
||||
|
||||
return countExecution.countDocuments(collectionName, filter, options);
|
||||
return countExecution.countDocuments(collectionPreparer, collectionName, filter, options);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1097,11 +1137,13 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
*/
|
||||
@Override
|
||||
public long estimatedCount(String collectionName) {
|
||||
return doEstimatedCount(collectionName, new EstimatedDocumentCountOptions());
|
||||
return doEstimatedCount(CollectionPreparerDelegate.of(this), collectionName, new EstimatedDocumentCountOptions());
|
||||
}
|
||||
|
||||
protected long doEstimatedCount(String collectionName, EstimatedDocumentCountOptions options) {
|
||||
return execute(collectionName, collection -> collection.estimatedDocumentCount(options));
|
||||
protected long doEstimatedCount(CollectionPreparer<MongoCollection<Document>> collectionPreparer,
|
||||
String collectionName, EstimatedDocumentCountOptions options) {
|
||||
return execute(collectionName,
|
||||
collection -> collectionPreparer.prepare(collection).estimatedDocumentCount(options));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -1112,12 +1154,13 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
CountOptions options = countContext.getCountOptions(entityClass);
|
||||
Document mappedQuery = countContext.getMappedQuery(entityClass, mappingContext::getPersistentEntity);
|
||||
|
||||
return doExactCount(collectionName, mappedQuery, options);
|
||||
return doExactCount(createDelegate(query), collectionName, mappedQuery, options);
|
||||
}
|
||||
|
||||
protected long doExactCount(String collectionName, Document filter, CountOptions options) {
|
||||
return execute(collectionName,
|
||||
collection -> collection.countDocuments(CountQuery.of(filter).toQueryDocument(), options));
|
||||
protected long doExactCount(CollectionPreparer<MongoCollection<Document>> collectionPreparer, String collectionName,
|
||||
Document filter, CountOptions options) {
|
||||
return execute(collectionName, collection -> collectionPreparer.prepare(collection)
|
||||
.countDocuments(CountQuery.of(filter).toQueryDocument(), options));
|
||||
}
|
||||
|
||||
protected boolean countCanBeEstimated(Document filter, CountOptions options) {
|
||||
@@ -1177,8 +1220,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
*/
|
||||
protected MongoCollection<Document> prepareCollection(MongoCollection<Document> collection) {
|
||||
|
||||
if (this.readPreference != null) {
|
||||
collection = collection.withReadPreference(readPreference);
|
||||
if (this.readPreference != null && this.readPreference != collection.getReadPreference()) {
|
||||
return collection.withReadPreference(readPreference);
|
||||
}
|
||||
|
||||
return collection;
|
||||
@@ -1754,7 +1797,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
@Override
|
||||
public <T> List<T> findAll(Class<T> entityClass, String collectionName) {
|
||||
return executeFindMultiInternal(
|
||||
new FindCallback(new Document(), new Document(),
|
||||
new FindCallback(CollectionPreparer.identity(), new Document(), new Document(),
|
||||
operations.forType(entityClass).getCollation().map(Collation::toMongoCollation).orElse(null)),
|
||||
CursorPreparer.NO_OP_PREPARER, new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName),
|
||||
collectionName);
|
||||
@@ -1812,7 +1855,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
|
||||
String mapFunc = replaceWithResourceIfNecessary(mapFunction);
|
||||
String reduceFunc = replaceWithResourceIfNecessary(reduceFunction);
|
||||
MongoCollection<Document> inputCollection = getAndPrepareCollection(doGetDatabase(), inputCollectionName);
|
||||
CollectionPreparerDelegate readPreference = createDelegate(query);
|
||||
MongoCollection<Document> inputCollection = readPreference
|
||||
.prepare(getAndPrepareCollection(doGetDatabase(), inputCollectionName));
|
||||
|
||||
// MapReduceOp
|
||||
MapReduceIterable<Document> mapReduce = inputCollection.mapReduce(mapFunc, reduceFunc, Document.class);
|
||||
@@ -1977,6 +2022,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
if (!CollectionUtils.isEmpty(result)) {
|
||||
|
||||
Query byIdInQuery = operations.getByIdInQuery(result);
|
||||
if (query.hasReadPreference()) {
|
||||
byIdInQuery.withReadPreference(query.getReadPreference());
|
||||
}
|
||||
|
||||
remove(byIdInQuery, entityClass, collectionName);
|
||||
}
|
||||
@@ -2032,7 +2080,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
return execute(collectionName, collection -> {
|
||||
|
||||
List<Document> rawResult = new ArrayList<>();
|
||||
|
||||
CollectionPreparerDelegate delegate = CollectionPreparerDelegate.of(options);
|
||||
Class<?> domainType = aggregation instanceof TypedAggregation ? ((TypedAggregation<?>) aggregation).getInputType()
|
||||
: null;
|
||||
|
||||
@@ -2040,7 +2088,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
() -> operations.forType(domainType) //
|
||||
.getCollation());
|
||||
|
||||
AggregateIterable<Document> aggregateIterable = collection.aggregate(pipeline, Document.class) //
|
||||
AggregateIterable<Document> aggregateIterable = delegate.prepare(collection).aggregate(pipeline, Document.class) //
|
||||
.collation(collation.map(Collation::toMongoCollation).orElse(null)) //
|
||||
.allowDiskUse(options.isAllowDiskUse());
|
||||
|
||||
@@ -2103,7 +2151,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
|
||||
return execute(collectionName, (CollectionCallback<Stream<O>>) collection -> {
|
||||
|
||||
AggregateIterable<Document> cursor = collection.aggregate(pipeline, Document.class) //
|
||||
CollectionPreparerDelegate delegate = CollectionPreparerDelegate.of(options);
|
||||
|
||||
AggregateIterable<Document> cursor = delegate.prepare(collection).aggregate(pipeline, Document.class) //
|
||||
.allowDiskUse(options.isAllowDiskUse());
|
||||
|
||||
if (options.getCursorBatchSize() != null) {
|
||||
@@ -2344,14 +2394,16 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
* The query document is specified as a standard {@link Document} and so is the fields specification.
|
||||
*
|
||||
* @param collectionName name of the collection to retrieve the objects from.
|
||||
* @param collectionPreparer the preparer to prepare the collection for the actual use.
|
||||
* @param query the query document that specifies the criteria used to find a record.
|
||||
* @param fields the document that specifies the fields to be returned.
|
||||
* @param entityClass the parameterized type of the returned list.
|
||||
* @return the converted object or {@literal null} if none exists.
|
||||
*/
|
||||
@Nullable
|
||||
protected <T> T doFindOne(String collectionName, Document query, Document fields, Class<T> entityClass) {
|
||||
return doFindOne(collectionName, query, fields, CursorPreparer.NO_OP_PREPARER, entityClass);
|
||||
protected <T> T doFindOne(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
|
||||
Document query, Document fields, Class<T> entityClass) {
|
||||
return doFindOne(collectionName, collectionPreparer, query, fields, CursorPreparer.NO_OP_PREPARER, entityClass);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2359,17 +2411,18 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
* The query document is specified as a standard {@link Document} and so is the fields specification.
|
||||
*
|
||||
* @param collectionName name of the collection to retrieve the objects from.
|
||||
* @param collectionPreparer the preparer to prepare the collection for the actual use.
|
||||
* @param query the query document that specifies the criteria used to find a record.
|
||||
* @param fields the document that specifies the fields to be returned.
|
||||
* @param entityClass the parameterized type of the returned list.
|
||||
* @param preparer the preparer used to modify the cursor on execution.
|
||||
* @param entityClass the parameterized type of the returned list.
|
||||
* @return the converted object or {@literal null} if none exists.
|
||||
* @since 2.2
|
||||
*/
|
||||
@Nullable
|
||||
@SuppressWarnings("ConstantConditions")
|
||||
protected <T> T doFindOne(String collectionName, Document query, Document fields, CursorPreparer preparer,
|
||||
Class<T> entityClass) {
|
||||
protected <T> T doFindOne(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
|
||||
Document query, Document fields, CursorPreparer preparer, Class<T> entityClass) {
|
||||
|
||||
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
|
||||
|
||||
@@ -2382,7 +2435,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
serializeToJsonSafely(query), mappedFields, entityClass, collectionName));
|
||||
}
|
||||
|
||||
return executeFindOneInternal(new FindOneCallback(mappedQuery, mappedFields, preparer),
|
||||
return executeFindOneInternal(new FindOneCallback(collectionPreparer, mappedQuery, mappedFields, preparer),
|
||||
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
|
||||
}
|
||||
|
||||
@@ -2391,13 +2444,15 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
* query document is specified as a standard Document and so is the fields specification.
|
||||
*
|
||||
* @param collectionName name of the collection to retrieve the objects from
|
||||
* @param collectionPreparer the preparer to prepare the collection for the actual use.
|
||||
* @param query the query document that specifies the criteria used to find a record
|
||||
* @param fields the document that specifies the fields to be returned
|
||||
* @param entityClass the parameterized type of the returned list.
|
||||
* @return the List of converted objects.
|
||||
*/
|
||||
protected <T> List<T> doFind(String collectionName, Document query, Document fields, Class<T> entityClass) {
|
||||
return doFind(collectionName, query, fields, entityClass, null,
|
||||
protected <T> List<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
|
||||
Document query, Document fields, Class<T> entityClass) {
|
||||
return doFind(collectionName, collectionPreparer, query, fields, entityClass, null,
|
||||
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName));
|
||||
}
|
||||
|
||||
@@ -2407,6 +2462,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
* specified as a standard Document and so is the fields specification.
|
||||
*
|
||||
* @param collectionName name of the collection to retrieve the objects from.
|
||||
* @param collectionPreparer the preparer to prepare the collection for the actual use.
|
||||
* @param query the query document that specifies the criteria used to find a record.
|
||||
* @param fields the document that specifies the fields to be returned.
|
||||
* @param entityClass the parameterized type of the returned list.
|
||||
@@ -2414,14 +2470,15 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
* (apply limits, skips and so on).
|
||||
* @return the {@link List} of converted objects.
|
||||
*/
|
||||
protected <T> List<T> doFind(String collectionName, Document query, Document fields, Class<T> entityClass,
|
||||
CursorPreparer preparer) {
|
||||
return doFind(collectionName, query, fields, entityClass, preparer,
|
||||
protected <T> List<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
|
||||
Document query, Document fields, Class<T> entityClass, CursorPreparer preparer) {
|
||||
return doFind(collectionName, collectionPreparer, query, fields, entityClass, preparer,
|
||||
new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName));
|
||||
}
|
||||
|
||||
protected <S, T> List<T> doFind(String collectionName, Document query, Document fields, Class<S> entityClass,
|
||||
@Nullable CursorPreparer preparer, DocumentCallback<T> objectCallback) {
|
||||
protected <S, T> List<T> doFind(String collectionName,
|
||||
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
|
||||
Class<S> entityClass, @Nullable CursorPreparer preparer, DocumentCallback<T> objectCallback) {
|
||||
|
||||
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
|
||||
|
||||
@@ -2434,7 +2491,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
serializeToJsonSafely(mappedQuery), mappedFields, entityClass, collectionName));
|
||||
}
|
||||
|
||||
return executeFindMultiInternal(new FindCallback(mappedQuery, mappedFields, null),
|
||||
return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields, null),
|
||||
preparer != null ? preparer : CursorPreparer.NO_OP_PREPARER, objectCallback, collectionName);
|
||||
}
|
||||
|
||||
@@ -2444,8 +2501,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
*
|
||||
* @since 2.0
|
||||
*/
|
||||
<S, T> List<T> doFind(String collectionName, Document query, Document fields, Class<S> sourceClass,
|
||||
Class<T> targetClass, CursorPreparer preparer) {
|
||||
<S, T> List<T> doFind(CollectionPreparer<MongoCollection<Document>> collectionPreparer, String collectionName,
|
||||
Document query, Document fields, Class<S> sourceClass, Class<T> targetClass, CursorPreparer preparer) {
|
||||
|
||||
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(sourceClass);
|
||||
EntityProjection<T, S> projection = operations.introspectProjection(targetClass, sourceClass);
|
||||
@@ -2459,7 +2516,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
serializeToJsonSafely(mappedQuery), mappedFields, sourceClass, collectionName));
|
||||
}
|
||||
|
||||
return executeFindMultiInternal(new FindCallback(mappedQuery, mappedFields, null), preparer,
|
||||
return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields, null), preparer,
|
||||
new ProjectingReadCallback<>(mongoConverter, projection, collectionName), collectionName);
|
||||
}
|
||||
|
||||
@@ -2533,8 +2590,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
* @return the List of converted objects.
|
||||
*/
|
||||
@SuppressWarnings("ConstantConditions")
|
||||
protected <T> T doFindAndRemove(String collectionName, Document query, Document fields, Document sort,
|
||||
@Nullable Collation collation, Class<T> entityClass) {
|
||||
protected <T> T doFindAndRemove(CollectionPreparer collectionPreparer, String collectionName, Document query,
|
||||
Document fields, Document sort, @Nullable Collation collation, Class<T> entityClass) {
|
||||
|
||||
EntityReader<? super T, Bson> readerToUse = this.mongoConverter;
|
||||
|
||||
@@ -2545,14 +2602,15 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
|
||||
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
|
||||
|
||||
return executeFindOneInternal(
|
||||
new FindAndRemoveCallback(queryMapper.getMappedObject(query, entity), fields, sort, collation),
|
||||
return executeFindOneInternal(new FindAndRemoveCallback(collectionPreparer,
|
||||
queryMapper.getMappedObject(query, entity), fields, sort, collation),
|
||||
new ReadDocumentCallback<>(readerToUse, entityClass, collectionName), collectionName);
|
||||
}
|
||||
|
||||
@SuppressWarnings("ConstantConditions")
|
||||
protected <T> T doFindAndModify(String collectionName, Document query, Document fields, Document sort,
|
||||
Class<T> entityClass, UpdateDefinition update, @Nullable FindAndModifyOptions options) {
|
||||
protected <T> T doFindAndModify(CollectionPreparer collectionPreparer, String collectionName, Document query,
|
||||
Document fields, Document sort, Class<T> entityClass, UpdateDefinition update,
|
||||
@Nullable FindAndModifyOptions options) {
|
||||
|
||||
EntityReader<? super T, Bson> readerToUse = this.mongoConverter;
|
||||
|
||||
@@ -2577,7 +2635,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
}
|
||||
|
||||
return executeFindOneInternal(
|
||||
new FindAndModifyCallback(mappedQuery, fields, sort, mappedUpdate,
|
||||
new FindAndModifyCallback(collectionPreparer, mappedQuery, fields, sort, mappedUpdate,
|
||||
update.getArrayFilters().stream().map(ArrayFilter::asDocument).collect(Collectors.toList()), options),
|
||||
new ReadDocumentCallback<>(readerToUse, entityClass, collectionName), collectionName);
|
||||
}
|
||||
@@ -2598,14 +2656,18 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
* {@literal false} and {@link FindAndReplaceOptions#isUpsert() upsert} is {@literal false}.
|
||||
*/
|
||||
@Nullable
|
||||
protected <T> T doFindAndReplace(String collectionName, Document mappedQuery, Document mappedFields,
|
||||
Document mappedSort, @Nullable com.mongodb.client.model.Collation collation, Class<?> entityType,
|
||||
Document replacement, FindAndReplaceOptions options, Class<T> resultType) {
|
||||
protected <T> T doFindAndReplace(CollectionPreparer collectionPreparer, String collectionName, Document mappedQuery,
|
||||
Document mappedFields, Document mappedSort, @Nullable com.mongodb.client.model.Collation collation,
|
||||
Class<?> entityType, Document replacement, FindAndReplaceOptions options, Class<T> resultType) {
|
||||
|
||||
EntityProjection<T, ?> projection = operations.introspectProjection(resultType, entityType);
|
||||
|
||||
return doFindAndReplace(collectionName, mappedQuery, mappedFields, mappedSort, collation, entityType, replacement,
|
||||
options, projection);
|
||||
return doFindAndReplace(collectionPreparer, collectionName, mappedQuery, mappedFields, mappedSort, collation,
|
||||
entityType, replacement, options, projection);
|
||||
}
|
||||
|
||||
CollectionPreparerDelegate createDelegate(Query query) {
|
||||
return CollectionPreparerDelegate.of(query);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2625,9 +2687,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
* @since 3.4
|
||||
*/
|
||||
@Nullable
|
||||
private <T> T doFindAndReplace(String collectionName, Document mappedQuery, Document mappedFields,
|
||||
Document mappedSort, @Nullable com.mongodb.client.model.Collation collation, Class<?> entityType,
|
||||
Document replacement, FindAndReplaceOptions options, EntityProjection<T, ?> projection) {
|
||||
private <T> T doFindAndReplace(CollectionPreparer collectionPreparer, String collectionName, Document mappedQuery,
|
||||
Document mappedFields, Document mappedSort, @Nullable com.mongodb.client.model.Collation collation,
|
||||
Class<?> entityType, Document replacement, FindAndReplaceOptions options, EntityProjection<T, ?> projection) {
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER
|
||||
@@ -2638,9 +2700,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
serializeToJsonSafely(mappedSort), entityType, serializeToJsonSafely(replacement), collectionName));
|
||||
}
|
||||
|
||||
return executeFindOneInternal(
|
||||
new FindAndReplaceCallback(mappedQuery, mappedFields, mappedSort, replacement, collation, options),
|
||||
new ProjectingReadCallback<>(mongoConverter, projection, collectionName), collectionName);
|
||||
return executeFindOneInternal(new FindAndReplaceCallback(collectionPreparer, mappedQuery, mappedFields, mappedSort,
|
||||
replacement, collation, options), new ProjectingReadCallback<>(mongoConverter, projection, collectionName),
|
||||
collectionName);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2810,12 +2872,15 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
*/
|
||||
private static class FindOneCallback implements CollectionCallback<Document> {
|
||||
|
||||
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
|
||||
private final Document query;
|
||||
private final Optional<Document> fields;
|
||||
private final CursorPreparer cursorPreparer;
|
||||
|
||||
FindOneCallback(Document query, Document fields, CursorPreparer preparer) {
|
||||
FindOneCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
|
||||
CursorPreparer preparer) {
|
||||
|
||||
this.collectionPreparer = collectionPreparer;
|
||||
this.query = query;
|
||||
this.fields = Optional.of(fields).filter(it -> !ObjectUtils.isEmpty(fields));
|
||||
this.cursorPreparer = preparer;
|
||||
@@ -2824,7 +2889,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
@Override
|
||||
public Document doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {
|
||||
|
||||
FindIterable<Document> iterable = cursorPreparer.initiateFind(collection, col -> col.find(query, Document.class));
|
||||
FindIterable<Document> iterable = cursorPreparer.initiateFind(collection,
|
||||
col -> collectionPreparer.prepare(col).find(query, Document.class));
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
|
||||
@@ -2851,15 +2917,18 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
*/
|
||||
private static class FindCallback implements CollectionCallback<FindIterable<Document>> {
|
||||
|
||||
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
|
||||
private final Document query;
|
||||
private final Document fields;
|
||||
private final @Nullable com.mongodb.client.model.Collation collation;
|
||||
|
||||
public FindCallback(Document query, Document fields, @Nullable com.mongodb.client.model.Collation collation) {
|
||||
public FindCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
|
||||
Document fields, @Nullable com.mongodb.client.model.Collation collation) {
|
||||
|
||||
Assert.notNull(query, "Query must not be null");
|
||||
Assert.notNull(fields, "Fields must not be null");
|
||||
|
||||
this.collectionPreparer = collectionPreparer;
|
||||
this.query = query;
|
||||
this.fields = fields;
|
||||
this.collation = collation;
|
||||
@@ -2869,7 +2938,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
public FindIterable<Document> doInCollection(MongoCollection<Document> collection)
|
||||
throws MongoException, DataAccessException {
|
||||
|
||||
FindIterable<Document> findIterable = collection.find(query, Document.class).projection(fields);
|
||||
FindIterable<Document> findIterable = collectionPreparer.prepare(collection).find(query, Document.class)
|
||||
.projection(fields);
|
||||
|
||||
if (collation != null) {
|
||||
findIterable = findIterable.collation(collation);
|
||||
@@ -2887,11 +2957,14 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
*/
|
||||
private class ExistsCallback implements CollectionCallback<Boolean> {
|
||||
|
||||
private final CollectionPreparer collectionPreparer;
|
||||
private final Document mappedQuery;
|
||||
private final com.mongodb.client.model.Collation collation;
|
||||
|
||||
ExistsCallback(Document mappedQuery, com.mongodb.client.model.Collation collation) {
|
||||
ExistsCallback(CollectionPreparer collectionPreparer, Document mappedQuery,
|
||||
com.mongodb.client.model.Collation collation) {
|
||||
|
||||
this.collectionPreparer = collectionPreparer;
|
||||
this.mappedQuery = mappedQuery;
|
||||
this.collation = collation;
|
||||
}
|
||||
@@ -2899,7 +2972,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
@Override
|
||||
public Boolean doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {
|
||||
|
||||
return doCount(collection.getNamespace().getCollectionName(), mappedQuery,
|
||||
return doCount(collectionPreparer, collection.getNamespace().getCollectionName(), mappedQuery,
|
||||
new CountOptions().limit(1).collation(collation)) > 0;
|
||||
}
|
||||
}
|
||||
@@ -2912,12 +2985,15 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
*/
|
||||
private static class FindAndRemoveCallback implements CollectionCallback<Document> {
|
||||
|
||||
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
|
||||
private final Document query;
|
||||
private final Document fields;
|
||||
private final Document sort;
|
||||
private final Optional<Collation> collation;
|
||||
|
||||
FindAndRemoveCallback(Document query, Document fields, Document sort, @Nullable Collation collation) {
|
||||
FindAndRemoveCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
|
||||
Document fields, Document sort, @Nullable Collation collation) {
|
||||
this.collectionPreparer = collectionPreparer;
|
||||
|
||||
this.query = query;
|
||||
this.fields = fields;
|
||||
@@ -2931,12 +3007,13 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
FindOneAndDeleteOptions opts = new FindOneAndDeleteOptions().sort(sort).projection(fields);
|
||||
collation.map(Collation::toMongoCollation).ifPresent(opts::collation);
|
||||
|
||||
return collection.findOneAndDelete(query, opts);
|
||||
return collectionPreparer.prepare(collection).findOneAndDelete(query, opts);
|
||||
}
|
||||
}
|
||||
|
||||
private static class FindAndModifyCallback implements CollectionCallback<Document> {
|
||||
|
||||
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
|
||||
private final Document query;
|
||||
private final Document fields;
|
||||
private final Document sort;
|
||||
@@ -2944,9 +3021,10 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
private final List<Document> arrayFilters;
|
||||
private final FindAndModifyOptions options;
|
||||
|
||||
FindAndModifyCallback(Document query, Document fields, Document sort, Object update, List<Document> arrayFilters,
|
||||
FindAndModifyOptions options) {
|
||||
FindAndModifyCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
|
||||
Document fields, Document sort, Object update, List<Document> arrayFilters, FindAndModifyOptions options) {
|
||||
|
||||
this.collectionPreparer = collectionPreparer;
|
||||
this.query = query;
|
||||
this.fields = fields;
|
||||
this.sort = sort;
|
||||
@@ -2975,9 +3053,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
}
|
||||
|
||||
if (update instanceof Document) {
|
||||
return collection.findOneAndUpdate(query, (Document) update, opts);
|
||||
return collectionPreparer.prepare(collection).findOneAndUpdate(query, (Document) update, opts);
|
||||
} else if (update instanceof List) {
|
||||
return collection.findOneAndUpdate(query, (List<Document>) update, opts);
|
||||
return collectionPreparer.prepare(collection).findOneAndUpdate(query, (List<Document>) update, opts);
|
||||
}
|
||||
|
||||
throw new IllegalArgumentException(String.format("Using %s is not supported in findOneAndUpdate", update));
|
||||
@@ -2993,6 +3071,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
*/
|
||||
private static class FindAndReplaceCallback implements CollectionCallback<Document> {
|
||||
|
||||
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
|
||||
private final Document query;
|
||||
private final Document fields;
|
||||
private final Document sort;
|
||||
@@ -3000,9 +3079,10 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
private final @Nullable com.mongodb.client.model.Collation collation;
|
||||
private final FindAndReplaceOptions options;
|
||||
|
||||
FindAndReplaceCallback(Document query, Document fields, Document sort, Document update,
|
||||
@Nullable com.mongodb.client.model.Collation collation, FindAndReplaceOptions options) {
|
||||
|
||||
FindAndReplaceCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
|
||||
Document fields, Document sort, Document update, @Nullable com.mongodb.client.model.Collation collation,
|
||||
FindAndReplaceOptions options) {
|
||||
this.collectionPreparer = collectionPreparer;
|
||||
this.query = query;
|
||||
this.fields = fields;
|
||||
this.sort = sort;
|
||||
@@ -3027,7 +3107,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
opts.returnDocument(ReturnDocument.AFTER);
|
||||
}
|
||||
|
||||
return collection.findOneAndReplace(query, update, opts);
|
||||
return collectionPreparer.prepare(collection).findOneAndReplace(query, update, opts);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3209,11 +3289,6 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
return cursorToUse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadPreference getReadPreference() {
|
||||
return query.getMeta().getFlags().contains(CursorOption.SECONDARY_READS) ? ReadPreference.primaryPreferred()
|
||||
: null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -3399,6 +3474,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
|
||||
@FunctionalInterface
|
||||
interface CountExecution {
|
||||
long countDocuments(String collection, Document filter, CountOptions options);
|
||||
long countDocuments(CollectionPreparer collectionPreparer, String collection, Document filter,
|
||||
CountOptions options);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ import reactor.core.publisher.Mono;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.springframework.dao.IncorrectResultSizeDataAccessException;
|
||||
import org.springframework.data.mongodb.core.CollectionPreparerSupport.ReactiveCollectionPreparerDelegate;
|
||||
import org.springframework.data.mongodb.core.query.NearQuery;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.data.mongodb.core.query.SerializationUtils;
|
||||
@@ -67,8 +68,8 @@ class ReactiveFindOperationSupport implements ReactiveFindOperation {
|
||||
private final String collection;
|
||||
private final Query query;
|
||||
|
||||
ReactiveFindSupport(ReactiveMongoTemplate template, Class<?> domainType, Class<T> returnType,
|
||||
String collection, Query query) {
|
||||
ReactiveFindSupport(ReactiveMongoTemplate template, Class<?> domainType, Class<T> returnType, String collection,
|
||||
Query query) {
|
||||
|
||||
this.template = template;
|
||||
this.domainType = domainType;
|
||||
@@ -169,8 +170,8 @@ class ReactiveFindOperationSupport implements ReactiveFindOperation {
|
||||
Document queryObject = query.getQueryObject();
|
||||
Document fieldsObject = query.getFieldsObject();
|
||||
|
||||
return template.doFind(getCollectionName(), queryObject, fieldsObject, domainType, returnType,
|
||||
preparer != null ? preparer : getCursorPreparer(query));
|
||||
return template.doFind(getCollectionName(), ReactiveCollectionPreparerDelegate.of(query), queryObject,
|
||||
fieldsObject, domainType, returnType, preparer != null ? preparer : getCursorPreparer(query));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
@@ -25,13 +25,13 @@ import java.util.function.Supplier;
|
||||
import org.bson.Document;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.reactivestreams.Subscription;
|
||||
|
||||
import org.springframework.data.geo.GeoResult;
|
||||
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
|
||||
import org.springframework.data.mongodb.core.aggregation.Aggregation;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
|
||||
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
|
||||
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
|
||||
@@ -256,6 +256,19 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations {
|
||||
return createView(name, source, AggregationPipeline.of(stages));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
|
||||
* on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
|
||||
*
|
||||
* @param name the name of the view to create.
|
||||
* @param source the type defining the views source collection.
|
||||
* @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
|
||||
* @since 4.1
|
||||
*/
|
||||
default Mono<MongoCollection<Document>> createView(String name, Class<?> source, AggregationStage... stages) {
|
||||
return createView(name, source, AggregationPipeline.of(stages));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
|
||||
* another collection or view identified by the given {@link #getCollectionName(Class) source type}.
|
||||
|
||||
@@ -70,6 +70,7 @@ import org.springframework.data.mongodb.MongoDatabaseFactory;
|
||||
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
|
||||
import org.springframework.data.mongodb.ReactiveMongoDatabaseUtils;
|
||||
import org.springframework.data.mongodb.SessionSynchronization;
|
||||
import org.springframework.data.mongodb.core.CollectionPreparerSupport.ReactiveCollectionPreparerDelegate;
|
||||
import org.springframework.data.mongodb.core.EntityOperations.AdaptibleEntity;
|
||||
import org.springframework.data.mongodb.core.QueryOperations.AggregationDefinition;
|
||||
import org.springframework.data.mongodb.core.QueryOperations.CountContext;
|
||||
@@ -80,6 +81,7 @@ import org.springframework.data.mongodb.core.QueryOperations.UpdateContext;
|
||||
import org.springframework.data.mongodb.core.aggregation.Aggregation;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOptions.Builder;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
|
||||
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
|
||||
import org.springframework.data.mongodb.core.aggregation.RelaxedTypeBasedAggregationOperationContext;
|
||||
@@ -105,7 +107,6 @@ import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
|
||||
import org.springframework.data.mongodb.core.query.BasicQuery;
|
||||
import org.springframework.data.mongodb.core.query.Collation;
|
||||
import org.springframework.data.mongodb.core.query.Meta;
|
||||
import org.springframework.data.mongodb.core.query.Meta.CursorOption;
|
||||
import org.springframework.data.mongodb.core.query.NearQuery;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition;
|
||||
@@ -147,9 +148,18 @@ import com.mongodb.reactivestreams.client.MongoDatabase;
|
||||
* extract results. This class executes BSON queries or updates, initiating iteration over {@link FindPublisher} and
|
||||
* catching MongoDB exceptions and translating them to the generic, more informative exception hierarchy defined in the
|
||||
* org.springframework.dao package. Can be used within a service implementation via direct instantiation with a
|
||||
* {@link SimpleReactiveMongoDatabaseFactory} reference, or get prepared in an application context and given to services
|
||||
* as bean reference. Note: The {@link SimpleReactiveMongoDatabaseFactory} should always be configured as a bean in the
|
||||
* application context, in the first case given to the service directly, in the second case to the prepared template.
|
||||
* {@link ReactiveMongoDatabaseFactory} reference, or get prepared in an application context and given to services as
|
||||
* bean reference.
|
||||
* <p>
|
||||
* Note: The {@link ReactiveMongoDatabaseFactory} should always be configured as a bean in the application context, in
|
||||
* the first case given to the service directly, in the second case to the prepared template.
|
||||
* <h3>{@link ReadPreference} and {@link com.mongodb.ReadConcern}</h3>
|
||||
* <p>
|
||||
* {@code ReadPreference} and {@code ReadConcern} are generally considered from {@link Query} and
|
||||
* {@link AggregationOptions} objects for the action to be executed on a particular {@link MongoCollection}.
|
||||
* <p>
|
||||
* You can also set the default {@link #setReadPreference(ReadPreference) ReadPreference} on the template level to
|
||||
* generally apply a {@link ReadPreference}.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
@@ -666,7 +676,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
@Nullable ViewOptions options) {
|
||||
|
||||
return createView(name, getCollectionName(source),
|
||||
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
|
||||
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
|
||||
options);
|
||||
}
|
||||
|
||||
@@ -675,7 +685,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
@Nullable ViewOptions options) {
|
||||
|
||||
return createView(name, source,
|
||||
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
|
||||
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
|
||||
options);
|
||||
}
|
||||
|
||||
@@ -756,8 +766,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
public <T> Mono<T> findOne(Query query, Class<T> entityClass, String collectionName) {
|
||||
|
||||
if (ObjectUtils.isEmpty(query.getSortObject())) {
|
||||
return doFindOne(collectionName, query.getQueryObject(), query.getFieldsObject(), entityClass,
|
||||
new QueryFindPublisherPreparer(query, entityClass));
|
||||
return doFindOne(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
|
||||
query.getFieldsObject(), entityClass, new QueryFindPublisherPreparer(query, entityClass));
|
||||
}
|
||||
|
||||
query.limit(1);
|
||||
@@ -783,10 +793,11 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
|
||||
return createFlux(collectionName, collection -> {
|
||||
|
||||
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
|
||||
QueryContext queryContext = queryOperations.createQueryContext(query);
|
||||
Document filter = queryContext.getMappedQuery(entityClass, this::getPersistentEntity);
|
||||
|
||||
FindPublisher<Document> findPublisher = collection.find(filter, Document.class)
|
||||
FindPublisher<Document> findPublisher = collectionPreparer.prepare(collection).find(filter, Document.class)
|
||||
.projection(new Document("_id", 1));
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
@@ -811,8 +822,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
return findAll(entityClass, collectionName);
|
||||
}
|
||||
|
||||
return doFind(collectionName, query.getQueryObject(), query.getFieldsObject(), entityClass,
|
||||
new QueryFindPublisherPreparer(query, entityClass));
|
||||
return doFind(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
|
||||
query.getFieldsObject(), entityClass, new QueryFindPublisherPreparer(query, entityClass));
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -825,7 +836,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
|
||||
String idKey = operations.getIdPropertyName(entityClass);
|
||||
|
||||
return doFindOne(collectionName, new Document(idKey, id), null, entityClass, (Collation) null);
|
||||
return doFindOne(collectionName, CollectionPreparer.identity(), new Document(idKey, id), null, entityClass,
|
||||
(Collation) null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -850,6 +862,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
Document mappedQuery = distinctQueryContext.getMappedQuery(entity);
|
||||
String mappedFieldName = distinctQueryContext.getMappedFieldName(entity);
|
||||
Class<T> mongoDriverCompatibleType = distinctQueryContext.getDriverCompatibleClass(resultClass);
|
||||
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
|
||||
|
||||
Flux<?> result = execute(collectionName, collection -> {
|
||||
|
||||
@@ -859,11 +872,9 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
}
|
||||
|
||||
FindPublisherPreparer preparer = new QueryFindPublisherPreparer(query, entityClass);
|
||||
if (preparer.hasReadPreference()) {
|
||||
collection = collection.withReadPreference(preparer.getReadPreference());
|
||||
}
|
||||
|
||||
DistinctPublisher<T> publisher = collection.distinct(mappedFieldName, mappedQuery, mongoDriverCompatibleType);
|
||||
DistinctPublisher<T> publisher = collectionPreparer.prepare(collection).distinct(mappedFieldName, mappedQuery,
|
||||
mongoDriverCompatibleType);
|
||||
distinctQueryContext.applyCollation(entityClass, publisher::collation);
|
||||
return publisher;
|
||||
});
|
||||
@@ -929,7 +940,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
boolean isOutOrMerge, AggregationOptions options, ReadDocumentCallback<O> readCallback,
|
||||
@Nullable Class<?> inputType) {
|
||||
|
||||
AggregatePublisher<Document> cursor = collection.aggregate(pipeline, Document.class)
|
||||
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(options);
|
||||
AggregatePublisher<Document> cursor = collectionPreparer.prepare(collection).aggregate(pipeline, Document.class)
|
||||
.allowDiskUse(options.isAllowDiskUse());
|
||||
|
||||
if (options.getCursorBatchSize() != null) {
|
||||
@@ -987,8 +999,19 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
GeoNearResultDocumentCallback<T> callback = new GeoNearResultDocumentCallback<>(distanceField,
|
||||
new ProjectingReadCallback<>(mongoConverter, projection, collection), near.getMetric());
|
||||
|
||||
Builder optionsBuilder = AggregationOptions.builder();
|
||||
if (near.hasReadPreference()) {
|
||||
optionsBuilder.readPreference(near.getReadPreference());
|
||||
}
|
||||
|
||||
if(near.hasReadConcern()) {
|
||||
optionsBuilder.readConcern(near.getReadConcern());
|
||||
}
|
||||
|
||||
optionsBuilder.collation(near.getCollation());
|
||||
|
||||
Aggregation $geoNear = TypedAggregation.newAggregation(entityClass, Aggregation.geoNear(near, distanceField))
|
||||
.withOptions(AggregationOptions.builder().collation(near.getCollation()).build());
|
||||
.withOptions(optionsBuilder.build());
|
||||
|
||||
return aggregate($geoNear, collection, Document.class) //
|
||||
.concatMap(callback::doWith);
|
||||
@@ -1028,8 +1051,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
operations.forType(entityClass).getCollation(query).ifPresent(optionsToUse::collation);
|
||||
}
|
||||
|
||||
return doFindAndModify(collectionName, query.getQueryObject(), query.getFieldsObject(),
|
||||
getMappedSortObject(query, entityClass), entityClass, update, optionsToUse);
|
||||
return doFindAndModify(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
|
||||
query.getFieldsObject(), getMappedSortObject(query, entityClass), entityClass, update, optionsToUse);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -1053,6 +1076,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
Document mappedQuery = queryContext.getMappedQuery(entity);
|
||||
Document mappedFields = queryContext.getMappedFields(entity, projection);
|
||||
Document mappedSort = queryContext.getMappedSort(entity);
|
||||
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
|
||||
|
||||
return Mono.defer(() -> {
|
||||
|
||||
@@ -1070,8 +1094,9 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
mapped.getCollection()));
|
||||
}).flatMap(it -> {
|
||||
|
||||
Mono<T> afterFindAndReplace = doFindAndReplace(it.getCollection(), mappedQuery, mappedFields, mappedSort,
|
||||
queryContext.getCollation(entityType).orElse(null), entityType, it.getTarget(), options, projection);
|
||||
Mono<T> afterFindAndReplace = doFindAndReplace(it.getCollection(), collectionPreparer, mappedQuery,
|
||||
mappedFields, mappedSort, queryContext.getCollation(entityType).orElse(null), entityType, it.getTarget(),
|
||||
options, projection);
|
||||
return afterFindAndReplace.flatMap(saved -> {
|
||||
maybeEmitEvent(new AfterSaveEvent<>(saved, it.getTarget(), it.getCollection()));
|
||||
return maybeCallAfterSave(saved, it.getTarget(), it.getCollection());
|
||||
@@ -1089,9 +1114,9 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
public <T> Mono<T> findAndRemove(Query query, Class<T> entityClass, String collectionName) {
|
||||
|
||||
operations.forType(entityClass).getCollation(query);
|
||||
return doFindAndRemove(collectionName, query.getQueryObject(), query.getFieldsObject(),
|
||||
getMappedSortObject(query, entityClass), operations.forType(entityClass).getCollation(query).orElse(null),
|
||||
entityClass);
|
||||
return doFindAndRemove(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
|
||||
query.getFieldsObject(), getMappedSortObject(query, entityClass),
|
||||
operations.forType(entityClass).getCollation(query).orElse(null), entityClass);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1799,12 +1824,14 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.REMOVE, collectionName, entityClass,
|
||||
null, removeQuery);
|
||||
WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);
|
||||
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
|
||||
|
||||
return execute(collectionName, collection -> {
|
||||
|
||||
maybeEmitEvent(new BeforeDeleteEvent<>(removeQuery, entityClass, collectionName));
|
||||
|
||||
MongoCollection<Document> collectionToUse = prepareCollection(collection, writeConcernToUse);
|
||||
MongoCollection<Document> collectionToUse = collectionPreparer
|
||||
.prepare(prepareCollection(collection, writeConcernToUse));
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(String.format("Remove using query: %s in collection: %s.", serializeToJsonSafely(removeQuery),
|
||||
@@ -1839,8 +1866,9 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> findAll(Class<T> entityClass, String collectionName) {
|
||||
return executeFindMultiInternal(new FindCallback(null), FindPublisherPreparer.NO_OP_PREPARER,
|
||||
new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName), collectionName);
|
||||
return executeFindMultiInternal(new FindCallback(CollectionPreparer.identity(), null),
|
||||
FindPublisherPreparer.NO_OP_PREPARER, new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName),
|
||||
collectionName);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -1867,17 +1895,19 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
@Override
|
||||
public <T> Flux<T> tail(@Nullable Query query, Class<T> entityClass, String collectionName) {
|
||||
|
||||
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
|
||||
if (query == null) {
|
||||
|
||||
LOGGER.debug(String.format("Tail for class: %s in collection: %s", entityClass, collectionName));
|
||||
|
||||
return executeFindMultiInternal(
|
||||
collection -> new FindCallback(null).doInCollection(collection).cursorType(CursorType.TailableAwait),
|
||||
collection -> new FindCallback(collectionPreparer, null).doInCollection(collection)
|
||||
.cursorType(CursorType.TailableAwait),
|
||||
FindPublisherPreparer.NO_OP_PREPARER, new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName),
|
||||
collectionName);
|
||||
}
|
||||
|
||||
return doFind(collectionName, query.getQueryObject(), query.getFieldsObject(), entityClass,
|
||||
return doFind(collectionName, collectionPreparer, query.getQueryObject(), query.getFieldsObject(), entityClass,
|
||||
new TailingQueryFindPublisherPreparer(query, entityClass));
|
||||
}
|
||||
|
||||
@@ -1961,12 +1991,14 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
|
||||
assertLocalFunctionNames(mapFunction, reduceFunction);
|
||||
|
||||
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(filterQuery);
|
||||
return createFlux(inputCollectionName, collection -> {
|
||||
|
||||
Document mappedQuery = queryMapper.getMappedObject(filterQuery.getQueryObject(),
|
||||
mappingContext.getPersistentEntity(domainType));
|
||||
|
||||
MapReducePublisher<Document> publisher = collection.mapReduce(mapFunction, reduceFunction, Document.class);
|
||||
MapReducePublisher<Document> publisher = collectionPreparer.prepare(collection).mapReduce(mapFunction,
|
||||
reduceFunction, Document.class);
|
||||
|
||||
publisher.filter(mappedQuery);
|
||||
|
||||
@@ -2133,16 +2165,18 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
* The query document is specified as a standard {@link Document} and so is the fields specification.
|
||||
*
|
||||
* @param collectionName name of the collection to retrieve the objects from.
|
||||
* @param collectionPreparer the preparer to prepare the collection for the actual use.
|
||||
* @param query the query document that specifies the criteria used to find a record.
|
||||
* @param fields the document that specifies the fields to be returned.
|
||||
* @param entityClass the parameterized type of the returned list.
|
||||
* @param collation can be {@literal null}.
|
||||
* @return the {@link List} of converted objects.
|
||||
*/
|
||||
protected <T> Mono<T> doFindOne(String collectionName, Document query, @Nullable Document fields,
|
||||
protected <T> Mono<T> doFindOne(String collectionName,
|
||||
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, @Nullable Document fields,
|
||||
Class<T> entityClass, @Nullable Collation collation) {
|
||||
|
||||
return doFindOne(collectionName, query, fields, entityClass,
|
||||
return doFindOne(collectionName, collectionPreparer, query, fields, entityClass,
|
||||
findPublisher -> collation != null ? findPublisher.collation(collation.toMongoCollation()) : findPublisher);
|
||||
}
|
||||
|
||||
@@ -2151,6 +2185,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
* The query document is specified as a standard {@link Document} and so is the fields specification.
|
||||
*
|
||||
* @param collectionName name of the collection to retrieve the objects from.
|
||||
* @param collectionPreparer the preparer to prepare the collection for the actual use.
|
||||
* @param query the query document that specifies the criteria used to find a record.
|
||||
* @param fields the document that specifies the fields to be returned.
|
||||
* @param entityClass the parameterized type of the returned list.
|
||||
@@ -2158,7 +2193,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
* @return the {@link List} of converted objects.
|
||||
* @since 2.2
|
||||
*/
|
||||
protected <T> Mono<T> doFindOne(String collectionName, Document query, @Nullable Document fields,
|
||||
protected <T> Mono<T> doFindOne(String collectionName,
|
||||
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, @Nullable Document fields,
|
||||
Class<T> entityClass, FindPublisherPreparer preparer) {
|
||||
|
||||
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
|
||||
@@ -2173,7 +2209,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
serializeToJsonSafely(query), mappedFields, entityClass, collectionName));
|
||||
}
|
||||
|
||||
return executeFindOneInternal(new FindOneCallback(mappedQuery, mappedFields, preparer),
|
||||
return executeFindOneInternal(new FindOneCallback(collectionPreparer, mappedQuery, mappedFields, preparer),
|
||||
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
|
||||
}
|
||||
|
||||
@@ -2182,13 +2218,15 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
* query document is specified as a standard Document and so is the fields specification.
|
||||
*
|
||||
* @param collectionName name of the collection to retrieve the objects from
|
||||
* @param collectionPreparer the preparer to prepare the collection for the actual use.
|
||||
* @param query the query document that specifies the criteria used to find a record
|
||||
* @param fields the document that specifies the fields to be returned
|
||||
* @param entityClass the parameterized type of the returned list.
|
||||
* @return the List of converted objects.
|
||||
*/
|
||||
protected <T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<T> entityClass) {
|
||||
return doFind(collectionName, query, fields, entityClass, null,
|
||||
protected <T> Flux<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
|
||||
Document query, Document fields, Class<T> entityClass) {
|
||||
return doFind(collectionName, collectionPreparer, query, fields, entityClass, null,
|
||||
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName));
|
||||
}
|
||||
|
||||
@@ -2198,6 +2236,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
* specified as a standard Document and so is the fields specification.
|
||||
*
|
||||
* @param collectionName name of the collection to retrieve the objects from.
|
||||
* @param collectionPreparer the preparer to prepare the collection for the actual use.
|
||||
* @param query the query document that specifies the criteria used to find a record.
|
||||
* @param fields the document that specifies the fields to be returned.
|
||||
* @param entityClass the parameterized type of the returned list.
|
||||
@@ -2205,14 +2244,15 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
* the result set, (apply limits, skips and so on).
|
||||
* @return the {@link List} of converted objects.
|
||||
*/
|
||||
protected <T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<T> entityClass,
|
||||
FindPublisherPreparer preparer) {
|
||||
return doFind(collectionName, query, fields, entityClass, preparer,
|
||||
protected <T> Flux<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
|
||||
Document query, Document fields, Class<T> entityClass, FindPublisherPreparer preparer) {
|
||||
return doFind(collectionName, collectionPreparer, query, fields, entityClass, preparer,
|
||||
new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName));
|
||||
}
|
||||
|
||||
protected <S, T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<S> entityClass,
|
||||
@Nullable FindPublisherPreparer preparer, DocumentCallback<T> objectCallback) {
|
||||
protected <S, T> Flux<T> doFind(String collectionName,
|
||||
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
|
||||
Class<S> entityClass, @Nullable FindPublisherPreparer preparer, DocumentCallback<T> objectCallback) {
|
||||
|
||||
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
|
||||
|
||||
@@ -2225,8 +2265,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
serializeToJsonSafely(mappedQuery), mappedFields, entityClass, collectionName));
|
||||
}
|
||||
|
||||
return executeFindMultiInternal(new FindCallback(mappedQuery, mappedFields), preparer, objectCallback,
|
||||
collectionName);
|
||||
return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields), preparer,
|
||||
objectCallback, collectionName);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2235,8 +2275,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
*
|
||||
* @since 2.0
|
||||
*/
|
||||
<S, T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<S> sourceClass,
|
||||
Class<T> targetClass, FindPublisherPreparer preparer) {
|
||||
<S, T> Flux<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
|
||||
Document query, Document fields, Class<S> sourceClass, Class<T> targetClass, FindPublisherPreparer preparer) {
|
||||
|
||||
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(sourceClass);
|
||||
EntityProjection<T, S> projection = operations.introspectProjection(targetClass, sourceClass);
|
||||
@@ -2250,7 +2290,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
serializeToJsonSafely(mappedQuery), mappedFields, sourceClass, collectionName));
|
||||
}
|
||||
|
||||
return executeFindMultiInternal(new FindCallback(mappedQuery, mappedFields), preparer,
|
||||
return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields), preparer,
|
||||
new ProjectingReadCallback<>(mongoConverter, projection, collectionName), collectionName);
|
||||
}
|
||||
|
||||
@@ -2268,13 +2308,15 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
* The first document that matches the query is returned and also removed from the collection in the database. <br />
|
||||
* The query document is specified as a standard Document and so is the fields specification.
|
||||
*
|
||||
* @param collectionName name of the collection to retrieve the objects from
|
||||
* @param query the query document that specifies the criteria used to find a record
|
||||
* @param collation collation
|
||||
* @param collectionName name of the collection to retrieve the objects from.
|
||||
* @param collectionPreparer the preparer to prepare the collection for the actual use.
|
||||
* @param query the query document that specifies the criteria used to find a record.
|
||||
* @param collation collation.
|
||||
* @param entityClass the parameterized type of the returned list.
|
||||
* @return the List of converted objects.
|
||||
*/
|
||||
protected <T> Mono<T> doFindAndRemove(String collectionName, Document query, Document fields, Document sort,
|
||||
protected <T> Mono<T> doFindAndRemove(String collectionName,
|
||||
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields, Document sort,
|
||||
@Nullable Collation collation, Class<T> entityClass) {
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
@@ -2284,12 +2326,13 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
|
||||
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
|
||||
|
||||
return executeFindOneInternal(
|
||||
new FindAndRemoveCallback(queryMapper.getMappedObject(query, entity), fields, sort, collation),
|
||||
return executeFindOneInternal(new FindAndRemoveCallback(collectionPreparer,
|
||||
queryMapper.getMappedObject(query, entity), fields, sort, collation),
|
||||
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
|
||||
}
|
||||
|
||||
protected <T> Mono<T> doFindAndModify(String collectionName, Document query, Document fields, Document sort,
|
||||
protected <T> Mono<T> doFindAndModify(String collectionName,
|
||||
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields, Document sort,
|
||||
Class<T> entityClass, UpdateDefinition update, FindAndModifyOptions options) {
|
||||
|
||||
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
|
||||
@@ -2310,7 +2353,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
}
|
||||
|
||||
return executeFindOneInternal(
|
||||
new FindAndModifyCallback(mappedQuery, fields, sort, mappedUpdate,
|
||||
new FindAndModifyCallback(collectionPreparer, mappedQuery, fields, sort, mappedUpdate,
|
||||
update.getArrayFilters().stream().map(ArrayFilter::asDocument).collect(Collectors.toList()), options),
|
||||
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
|
||||
});
|
||||
@@ -2320,6 +2363,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
* Customize this part for findAndReplace.
|
||||
*
|
||||
* @param collectionName The name of the collection to perform the operation in.
|
||||
* @param collectionPreparer the preparer to prepare the collection for the actual use.
|
||||
* @param mappedQuery the query to look up documents.
|
||||
* @param mappedFields the fields to project the result to.
|
||||
* @param mappedSort the sort to be applied when executing the query.
|
||||
@@ -2332,20 +2376,22 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
* {@literal false} and {@link FindAndReplaceOptions#isUpsert() upsert} is {@literal false}.
|
||||
* @since 2.1
|
||||
*/
|
||||
protected <T> Mono<T> doFindAndReplace(String collectionName, Document mappedQuery, Document mappedFields,
|
||||
protected <T> Mono<T> doFindAndReplace(String collectionName,
|
||||
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document mappedQuery, Document mappedFields,
|
||||
Document mappedSort, com.mongodb.client.model.Collation collation, Class<?> entityType, Document replacement,
|
||||
FindAndReplaceOptions options, Class<T> resultType) {
|
||||
|
||||
EntityProjection<T, ?> projection = operations.introspectProjection(resultType, entityType);
|
||||
|
||||
return doFindAndReplace(collectionName, mappedQuery, mappedFields, mappedSort, collation, entityType, replacement,
|
||||
options, projection);
|
||||
return doFindAndReplace(collectionName, collectionPreparer, mappedQuery, mappedFields, mappedSort, collation,
|
||||
entityType, replacement, options, projection);
|
||||
}
|
||||
|
||||
/**
|
||||
* Customize this part for findAndReplace.
|
||||
*
|
||||
* @param collectionName The name of the collection to perform the operation in.
|
||||
* @param collectionPreparer the preparer to prepare the collection for the actual use.
|
||||
* @param mappedQuery the query to look up documents.
|
||||
* @param mappedFields the fields to project the result to.
|
||||
* @param mappedSort the sort to be applied when executing the query.
|
||||
@@ -2358,7 +2404,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
* {@literal false} and {@link FindAndReplaceOptions#isUpsert() upsert} is {@literal false}.
|
||||
* @since 3.4
|
||||
*/
|
||||
private <T> Mono<T> doFindAndReplace(String collectionName, Document mappedQuery, Document mappedFields,
|
||||
private <T> Mono<T> doFindAndReplace(String collectionName,
|
||||
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document mappedQuery, Document mappedFields,
|
||||
Document mappedSort, com.mongodb.client.model.Collation collation, Class<?> entityType, Document replacement,
|
||||
FindAndReplaceOptions options, EntityProjection<T, ?> projection) {
|
||||
|
||||
@@ -2372,8 +2419,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
serializeToJsonSafely(replacement), collectionName));
|
||||
}
|
||||
|
||||
return executeFindOneInternal(
|
||||
new FindAndReplaceCallback(mappedQuery, mappedFields, mappedSort, replacement, collation, options),
|
||||
return executeFindOneInternal(new FindAndReplaceCallback(collectionPreparer, mappedQuery, mappedFields,
|
||||
mappedSort, replacement, collation, options),
|
||||
new ProjectingReadCallback<>(this.mongoConverter, projection, collectionName), collectionName);
|
||||
|
||||
});
|
||||
@@ -2451,7 +2498,12 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
* @param collection
|
||||
*/
|
||||
protected MongoCollection<Document> prepareCollection(MongoCollection<Document> collection) {
|
||||
return this.readPreference != null ? collection.withReadPreference(readPreference) : collection;
|
||||
|
||||
if (this.readPreference != null && this.readPreference != collection.getReadPreference()) {
|
||||
return collection.withReadPreference(readPreference);
|
||||
}
|
||||
|
||||
return collection;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2621,11 +2673,14 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
*/
|
||||
private static class FindOneCallback implements ReactiveCollectionCallback<Document> {
|
||||
|
||||
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
|
||||
private final Document query;
|
||||
private final Optional<Document> fields;
|
||||
private final FindPublisherPreparer preparer;
|
||||
|
||||
FindOneCallback(Document query, @Nullable Document fields, FindPublisherPreparer preparer) {
|
||||
FindOneCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
|
||||
@Nullable Document fields, FindPublisherPreparer preparer) {
|
||||
this.collectionPreparer = collectionPreparer;
|
||||
this.query = query;
|
||||
this.fields = Optional.ofNullable(fields);
|
||||
this.preparer = preparer;
|
||||
@@ -2642,7 +2697,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
serializeToJsonSafely(fields.orElseGet(Document::new)), collection.getNamespace().getFullName()));
|
||||
}
|
||||
|
||||
FindPublisher<Document> publisher = preparer.initiateFind(collection, col -> col.find(query, Document.class));
|
||||
FindPublisher<Document> publisher = preparer.initiateFind(collectionPreparer.prepare(collection),
|
||||
col -> col.find(query, Document.class));
|
||||
|
||||
if (fields.isPresent()) {
|
||||
publisher = publisher.projection(fields.get());
|
||||
@@ -2660,15 +2716,17 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
*/
|
||||
private static class FindCallback implements ReactiveCollectionQueryCallback<Document> {
|
||||
|
||||
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
|
||||
|
||||
private final @Nullable Document query;
|
||||
private final @Nullable Document fields;
|
||||
|
||||
FindCallback(@Nullable Document query) {
|
||||
this(query, null);
|
||||
FindCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, @Nullable Document query) {
|
||||
this(collectionPreparer, query, null);
|
||||
}
|
||||
|
||||
FindCallback(Document query, Document fields) {
|
||||
|
||||
FindCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields) {
|
||||
this.collectionPreparer = collectionPreparer;
|
||||
this.query = query;
|
||||
this.fields = fields;
|
||||
}
|
||||
@@ -2676,11 +2734,12 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
@Override
|
||||
public FindPublisher<Document> doInCollection(MongoCollection<Document> collection) {
|
||||
|
||||
MongoCollection<Document> collectionToUse = collectionPreparer.prepare(collection);
|
||||
FindPublisher<Document> findPublisher;
|
||||
if (ObjectUtils.isEmpty(query)) {
|
||||
findPublisher = collection.find(Document.class);
|
||||
findPublisher = collectionToUse.find(Document.class);
|
||||
} else {
|
||||
findPublisher = collection.find(query, Document.class);
|
||||
findPublisher = collectionToUse.find(query, Document.class);
|
||||
}
|
||||
|
||||
if (ObjectUtils.isEmpty(fields)) {
|
||||
@@ -2699,13 +2758,15 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
*/
|
||||
private static class FindAndRemoveCallback implements ReactiveCollectionCallback<Document> {
|
||||
|
||||
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
|
||||
private final Document query;
|
||||
private final Document fields;
|
||||
private final Document sort;
|
||||
private final Optional<Collation> collation;
|
||||
|
||||
FindAndRemoveCallback(Document query, Document fields, Document sort, @Nullable Collation collation) {
|
||||
|
||||
FindAndRemoveCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
|
||||
Document fields, Document sort, @Nullable Collation collation) {
|
||||
this.collectionPreparer = collectionPreparer;
|
||||
this.query = query;
|
||||
this.fields = fields;
|
||||
this.sort = sort;
|
||||
@@ -2719,7 +2780,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
FindOneAndDeleteOptions findOneAndDeleteOptions = convertToFindOneAndDeleteOptions(fields, sort);
|
||||
collation.map(Collation::toMongoCollation).ifPresent(findOneAndDeleteOptions::collation);
|
||||
|
||||
return collection.findOneAndDelete(query, findOneAndDeleteOptions);
|
||||
return collectionPreparer.prepare(collection).findOneAndDelete(query, findOneAndDeleteOptions);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2728,6 +2789,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
*/
|
||||
private static class FindAndModifyCallback implements ReactiveCollectionCallback<Document> {
|
||||
|
||||
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
|
||||
private final Document query;
|
||||
private final Document fields;
|
||||
private final Document sort;
|
||||
@@ -2735,9 +2797,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
private final List<Document> arrayFilters;
|
||||
private final FindAndModifyOptions options;
|
||||
|
||||
FindAndModifyCallback(Document query, Document fields, Document sort, Object update, List<Document> arrayFilters,
|
||||
FindAndModifyOptions options) {
|
||||
FindAndModifyCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
|
||||
Document fields, Document sort, Object update, List<Document> arrayFilters, FindAndModifyOptions options) {
|
||||
|
||||
this.collectionPreparer = collectionPreparer;
|
||||
this.query = query;
|
||||
this.fields = fields;
|
||||
this.sort = sort;
|
||||
@@ -2750,21 +2813,22 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
public Publisher<Document> doInCollection(MongoCollection<Document> collection)
|
||||
throws MongoException, DataAccessException {
|
||||
|
||||
MongoCollection<Document> collectionToUse = collectionPreparer.prepare(collection);
|
||||
if (options.isRemove()) {
|
||||
FindOneAndDeleteOptions findOneAndDeleteOptions = convertToFindOneAndDeleteOptions(fields, sort);
|
||||
|
||||
findOneAndDeleteOptions = options.getCollation().map(Collation::toMongoCollation)
|
||||
.map(findOneAndDeleteOptions::collation).orElse(findOneAndDeleteOptions);
|
||||
|
||||
return collection.findOneAndDelete(query, findOneAndDeleteOptions);
|
||||
return collectionToUse.findOneAndDelete(query, findOneAndDeleteOptions);
|
||||
}
|
||||
|
||||
FindOneAndUpdateOptions findOneAndUpdateOptions = convertToFindOneAndUpdateOptions(options, fields, sort,
|
||||
arrayFilters);
|
||||
if (update instanceof Document) {
|
||||
return collection.findOneAndUpdate(query, (Document) update, findOneAndUpdateOptions);
|
||||
return collectionToUse.findOneAndUpdate(query, (Document) update, findOneAndUpdateOptions);
|
||||
} else if (update instanceof List) {
|
||||
return collection.findOneAndUpdate(query, (List<Document>) update, findOneAndUpdateOptions);
|
||||
return collectionToUse.findOneAndUpdate(query, (List<Document>) update, findOneAndUpdateOptions);
|
||||
}
|
||||
|
||||
return Flux
|
||||
@@ -2803,6 +2867,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
*/
|
||||
private static class FindAndReplaceCallback implements ReactiveCollectionCallback<Document> {
|
||||
|
||||
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
|
||||
private final Document query;
|
||||
private final Document fields;
|
||||
private final Document sort;
|
||||
@@ -2810,9 +2875,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
private final @Nullable com.mongodb.client.model.Collation collation;
|
||||
private final FindAndReplaceOptions options;
|
||||
|
||||
FindAndReplaceCallback(Document query, Document fields, Document sort, Document update,
|
||||
com.mongodb.client.model.Collation collation, FindAndReplaceOptions options) {
|
||||
|
||||
FindAndReplaceCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
|
||||
Document fields, Document sort, Document update, com.mongodb.client.model.Collation collation,
|
||||
FindAndReplaceOptions options) {
|
||||
this.collectionPreparer = collectionPreparer;
|
||||
this.query = query;
|
||||
this.fields = fields;
|
||||
this.sort = sort;
|
||||
@@ -2826,7 +2892,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
throws MongoException, DataAccessException {
|
||||
|
||||
FindOneAndReplaceOptions findOneAndReplaceOptions = convertToFindOneAndReplaceOptions(options, fields, sort);
|
||||
return collection.findOneAndReplace(query, update, findOneAndReplaceOptions);
|
||||
return collectionPreparer.prepare(collection).findOneAndReplace(query, update, findOneAndReplaceOptions);
|
||||
}
|
||||
|
||||
private FindOneAndReplaceOptions convertToFindOneAndReplaceOptions(FindAndReplaceOptions options, Document fields,
|
||||
@@ -3092,11 +3158,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
return findPublisherToUse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadPreference getReadPreference() {
|
||||
return query.getMeta().getFlags().contains(CursorOption.SECONDARY_READS) ? ReadPreference.primaryPreferred()
|
||||
: null;
|
||||
}
|
||||
}
|
||||
|
||||
class TailingQueryFindPublisherPreparer extends QueryFindPublisherPreparer {
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core;
|
||||
|
||||
import org.springframework.lang.Nullable;
|
||||
|
||||
import com.mongodb.ReadConcern;
|
||||
|
||||
/**
|
||||
* Interface to be implemented by any object that wishes to expose the {@link ReadConcern}.
|
||||
* <p>
|
||||
* Typically implemented by cursor or query preparer objects.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @since 4.1
|
||||
* @see org.springframework.data.mongodb.core.query.Query
|
||||
* @see org.springframework.data.mongodb.core.aggregation.AggregationOptions
|
||||
*/
|
||||
public interface ReadConcernAware {
|
||||
|
||||
/**
|
||||
* @return {@literal true} if a {@link ReadConcern} is set.
|
||||
*/
|
||||
default boolean hasReadConcern() {
|
||||
return getReadConcern() != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the {@link ReadConcern} to apply or {@literal null} if none set.
|
||||
*/
|
||||
@Nullable
|
||||
ReadConcern getReadConcern();
|
||||
}
|
||||
@@ -27,6 +27,8 @@ import com.mongodb.ReadPreference;
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @since 2.2
|
||||
* @see org.springframework.data.mongodb.core.query.Query
|
||||
* @see org.springframework.data.mongodb.core.aggregation.AggregationOptions
|
||||
*/
|
||||
public interface ReadPreferenceAware {
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ import org.springframework.data.mongodb.core.aggregation.AddFieldsOperation.AddF
|
||||
import org.springframework.data.mongodb.core.aggregation.CountOperation.CountOperationBuilder;
|
||||
import org.springframework.data.mongodb.core.aggregation.FacetOperation.FacetOperationBuilder;
|
||||
import org.springframework.data.mongodb.core.aggregation.GraphLookupOperation.StartWithBuilder;
|
||||
import org.springframework.data.mongodb.core.aggregation.LookupOperation.LookupOperationBuilder;
|
||||
import org.springframework.data.mongodb.core.aggregation.MergeOperation.MergeOperationBuilder;
|
||||
import org.springframework.data.mongodb.core.aggregation.ReplaceRootOperation.ReplaceRootDocumentOperationBuilder;
|
||||
import org.springframework.data.mongodb.core.aggregation.ReplaceRootOperation.ReplaceRootOperationBuilder;
|
||||
@@ -50,6 +51,7 @@ import org.springframework.util.Assert;
|
||||
* @author Nikolay Bogdanov
|
||||
* @author Gustavo de Geus
|
||||
* @author Jérôme Guyon
|
||||
* @author Sangyong Choi
|
||||
* @since 1.3
|
||||
*/
|
||||
public class Aggregation {
|
||||
@@ -100,12 +102,12 @@ public class Aggregation {
|
||||
private final AggregationOptions options;
|
||||
|
||||
/**
|
||||
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
|
||||
* Creates a new {@link Aggregation} from the given {@link AggregationStage}s.
|
||||
*
|
||||
* @param operations must not be {@literal null} or empty.
|
||||
*/
|
||||
public static Aggregation newAggregation(List<? extends AggregationOperation> operations) {
|
||||
return newAggregation(operations.toArray(new AggregationOperation[operations.size()]));
|
||||
public static Aggregation newAggregation(List<? extends AggregationStage> operations) {
|
||||
return newAggregation(operations.toArray(AggregationStage[]::new));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -117,6 +119,16 @@ public class Aggregation {
|
||||
return new Aggregation(operations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
|
||||
*
|
||||
* @param stages must not be {@literal null} or empty.
|
||||
* @since 4.1
|
||||
*/
|
||||
public static Aggregation newAggregation(AggregationStage... stages) {
|
||||
return new Aggregation(stages);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
|
||||
*
|
||||
@@ -128,6 +140,17 @@ public class Aggregation {
|
||||
return AggregationUpdate.from(Arrays.asList(operations));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
|
||||
*
|
||||
* @param operations can be {@literal empty} but must not be {@literal null}.
|
||||
* @return new instance of {@link AggregationUpdate}.
|
||||
* @since 4.1
|
||||
*/
|
||||
public static AggregationUpdate newUpdate(AggregationStage... operations) {
|
||||
return AggregationUpdate.updateFrom(Arrays.asList(operations));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a copy of this {@link Aggregation} with the given {@link AggregationOptions} set. Note that options are
|
||||
* supported in MongoDB version 2.6+.
|
||||
@@ -139,7 +162,7 @@ public class Aggregation {
|
||||
public Aggregation withOptions(AggregationOptions options) {
|
||||
|
||||
Assert.notNull(options, "AggregationOptions must not be null");
|
||||
return new Aggregation(this.pipeline.getOperations(), options);
|
||||
return new Aggregation(this.pipeline.getStages(), options);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -148,8 +171,8 @@ public class Aggregation {
|
||||
* @param type must not be {@literal null}.
|
||||
* @param operations must not be {@literal null} or empty.
|
||||
*/
|
||||
public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationOperation> operations) {
|
||||
return newAggregation(type, operations.toArray(new AggregationOperation[operations.size()]));
|
||||
public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationStage> operations) {
|
||||
return newAggregation(type, operations.toArray(AggregationStage[]::new));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -162,6 +185,17 @@ public class Aggregation {
|
||||
return new TypedAggregation<T>(type, operations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link TypedAggregation} for the given type and {@link AggregationOperation}s.
|
||||
*
|
||||
* @param type must not be {@literal null}.
|
||||
* @param stages must not be {@literal null} or empty.
|
||||
* @since 4.1
|
||||
*/
|
||||
public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationStage... stages) {
|
||||
return new TypedAggregation<>(type, stages);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
|
||||
*
|
||||
@@ -171,6 +205,15 @@ public class Aggregation {
|
||||
this(asAggregationList(aggregationOperations));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
|
||||
*
|
||||
* @param aggregationOperations must not be {@literal null} or empty.
|
||||
*/
|
||||
protected Aggregation(AggregationStage... aggregationOperations) {
|
||||
this(Arrays.asList(aggregationOperations));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param aggregationOperations must not be {@literal null} or empty.
|
||||
* @return
|
||||
@@ -187,7 +230,7 @@ public class Aggregation {
|
||||
*
|
||||
* @param aggregationOperations must not be {@literal null} or empty.
|
||||
*/
|
||||
protected Aggregation(List<AggregationOperation> aggregationOperations) {
|
||||
protected Aggregation(List<? extends AggregationStage> aggregationOperations) {
|
||||
this(aggregationOperations, DEFAULT_OPTIONS);
|
||||
}
|
||||
|
||||
@@ -197,7 +240,7 @@ public class Aggregation {
|
||||
* @param aggregationOperations must not be {@literal null}.
|
||||
* @param options must not be {@literal null} or empty.
|
||||
*/
|
||||
protected Aggregation(List<AggregationOperation> aggregationOperations, AggregationOptions options) {
|
||||
protected Aggregation(List<? extends AggregationStage> aggregationOperations, AggregationOptions options) {
|
||||
|
||||
Assert.notNull(aggregationOperations, "AggregationOperations must not be null");
|
||||
Assert.notNull(options, "AggregationOptions must not be null");
|
||||
@@ -636,6 +679,17 @@ public class Aggregation {
|
||||
return facet().and(aggregationOperations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link FacetOperationBuilder} given {@link Aggregation}.
|
||||
*
|
||||
* @param stages the sub-pipeline, must not be {@literal null}.
|
||||
* @return new instance of {@link FacetOperation}.
|
||||
* @since 4.1
|
||||
*/
|
||||
public static FacetOperationBuilder facet(AggregationStage... stages) {
|
||||
return facet().and(stages);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link LookupOperation}.
|
||||
*
|
||||
@@ -664,6 +718,23 @@ public class Aggregation {
|
||||
return new LookupOperation(from, localField, foreignField, as);
|
||||
}
|
||||
|
||||
/**
|
||||
* Entrypoint for creating {@link LookupOperation $lookup} using a fluent builder API.
|
||||
*
|
||||
* <pre class="code">
|
||||
* Aggregation.lookup().from("restaurants").localField("restaurant_name").foreignField("name")
|
||||
* .let(newVariable("orders_drink").forField("drink"))
|
||||
* .pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
|
||||
* .as("matches")
|
||||
* </pre>
|
||||
*
|
||||
* @return new instance of {@link LookupOperationBuilder}.
|
||||
* @since 4.1
|
||||
*/
|
||||
public static LookupOperationBuilder lookup() {
|
||||
return new LookupOperationBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link CountOperationBuilder}.
|
||||
*
|
||||
|
||||
@@ -15,10 +15,10 @@
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.aggregation;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
/**
|
||||
* Represents one single operation in an aggregation pipeline.
|
||||
@@ -29,30 +29,24 @@ import org.bson.Document;
|
||||
* @author Christoph Strobl
|
||||
* @since 1.3
|
||||
*/
|
||||
public interface AggregationOperation {
|
||||
public interface AggregationOperation extends MultiOperationAggregationStage {
|
||||
|
||||
/**
|
||||
* Turns the {@link AggregationOperation} into a {@link Document} by using the given
|
||||
* {@link AggregationOperationContext}.
|
||||
*
|
||||
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
|
||||
* @return the Document
|
||||
* @deprecated since 2.2 in favor of {@link #toPipelineStages(AggregationOperationContext)}.
|
||||
* @return
|
||||
*/
|
||||
@Deprecated
|
||||
@Override
|
||||
Document toDocument(AggregationOperationContext context);
|
||||
|
||||
/**
|
||||
* Turns the {@link AggregationOperation} into list of {@link Document stages} by using the given
|
||||
* {@link AggregationOperationContext}. This allows a single {@link AggregationOptions} to add additional stages for
|
||||
* eg. {@code $sort} or {@code $limit}.
|
||||
* More the exception than the default.
|
||||
*
|
||||
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
|
||||
* @return the pipeline stages to run through. Never {@literal null}.
|
||||
* @since 2.2
|
||||
* @return never {@literal null}.
|
||||
*/
|
||||
@Override
|
||||
default List<Document> toPipelineStages(AggregationOperationContext context) {
|
||||
return Collections.singletonList(toDocument(context));
|
||||
return List.of(toDocument(context));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -63,6 +57,6 @@ public interface AggregationOperation {
|
||||
* @since 3.0.2
|
||||
*/
|
||||
default String getOperator() {
|
||||
return toDocument(Aggregation.DEFAULT_CONTEXT).keySet().iterator().next();
|
||||
return CollectionUtils.lastElement(toPipelineStages(Aggregation.DEFAULT_CONTEXT)).keySet().iterator().next();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,15 +45,19 @@ class AggregationOperationRenderer {
|
||||
* @param rootContext must not be {@literal null}.
|
||||
* @return the {@link List} of {@link Document}.
|
||||
*/
|
||||
static List<Document> toDocument(List<AggregationOperation> operations, AggregationOperationContext rootContext) {
|
||||
static List<Document> toDocument(List<? extends AggregationStage> operations, AggregationOperationContext rootContext) {
|
||||
|
||||
List<Document> operationDocuments = new ArrayList<Document>(operations.size());
|
||||
|
||||
AggregationOperationContext contextToUse = rootContext;
|
||||
|
||||
for (AggregationOperation operation : operations) {
|
||||
for (AggregationStage operation : operations) {
|
||||
|
||||
operationDocuments.addAll(operation.toPipelineStages(contextToUse));
|
||||
if(operation instanceof MultiOperationAggregationStage mops) {
|
||||
operationDocuments.addAll(mops.toPipelineStages(contextToUse));
|
||||
} else {
|
||||
operationDocuments.add(operation.toDocument(contextToUse));
|
||||
}
|
||||
|
||||
if (operation instanceof FieldsExposingAggregationOperation) {
|
||||
|
||||
|
||||
@@ -19,11 +19,16 @@ import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.springframework.data.mongodb.core.ReadConcernAware;
|
||||
import org.springframework.data.mongodb.core.ReadPreferenceAware;
|
||||
import org.springframework.data.mongodb.core.query.Collation;
|
||||
import org.springframework.data.mongodb.util.BsonUtils;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import com.mongodb.ReadConcern;
|
||||
import com.mongodb.ReadPreference;
|
||||
|
||||
/**
|
||||
* Holds a set of configurable aggregation options that can be used within an aggregation pipeline. A list of support
|
||||
* aggregation options can be found in the MongoDB reference documentation
|
||||
@@ -39,7 +44,7 @@ import org.springframework.util.Assert;
|
||||
* @see TypedAggregation#withOptions(AggregationOptions)
|
||||
* @since 1.6
|
||||
*/
|
||||
public class AggregationOptions {
|
||||
public class AggregationOptions implements ReadConcernAware, ReadPreferenceAware {
|
||||
|
||||
private static final String BATCH_SIZE = "batchSize";
|
||||
private static final String CURSOR = "cursor";
|
||||
@@ -56,6 +61,10 @@ public class AggregationOptions {
|
||||
private final Optional<Collation> collation;
|
||||
private final Optional<String> comment;
|
||||
private final Optional<Object> hint;
|
||||
|
||||
private Optional<ReadConcern> readConcern;
|
||||
|
||||
private Optional<ReadPreference> readPreference;
|
||||
private Duration maxTime = Duration.ZERO;
|
||||
private ResultOptions resultOptions = ResultOptions.READ;
|
||||
private DomainTypeMapping domainTypeMapping = DomainTypeMapping.RELAXED;
|
||||
@@ -123,6 +132,8 @@ public class AggregationOptions {
|
||||
this.collation = Optional.ofNullable(collation);
|
||||
this.comment = Optional.ofNullable(comment);
|
||||
this.hint = Optional.ofNullable(hint);
|
||||
this.readConcern = Optional.empty();
|
||||
this.readPreference = Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -268,6 +279,26 @@ public class AggregationOptions {
|
||||
return hint;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasReadConcern() {
|
||||
return readConcern.isPresent();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadConcern getReadConcern() {
|
||||
return readConcern.orElse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasReadPreference() {
|
||||
return readPreference.isPresent();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadPreference getReadPreference() {
|
||||
return readPreference.orElse(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the time limit for processing. {@link Duration#ZERO} is used for the default unbounded behavior.
|
||||
* @since 3.0
|
||||
@@ -385,6 +416,8 @@ public class AggregationOptions {
|
||||
private @Nullable Collation collation;
|
||||
private @Nullable String comment;
|
||||
private @Nullable Object hint;
|
||||
private @Nullable ReadConcern readConcern;
|
||||
private @Nullable ReadPreference readPreference;
|
||||
private @Nullable Duration maxTime;
|
||||
private @Nullable ResultOptions resultOptions;
|
||||
private @Nullable DomainTypeMapping domainTypeMapping;
|
||||
@@ -490,6 +523,32 @@ public class AggregationOptions {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Define a {@link ReadConcern} to apply to the aggregation.
|
||||
*
|
||||
* @param readConcern can be {@literal null}.
|
||||
* @return this.
|
||||
* @since 4.1
|
||||
*/
|
||||
public Builder readConcern(@Nullable ReadConcern readConcern) {
|
||||
|
||||
this.readConcern = readConcern;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Define a {@link ReadPreference} to apply to the aggregation.
|
||||
*
|
||||
* @param readPreference can be {@literal null}.
|
||||
* @return this.
|
||||
* @since 4.1
|
||||
*/
|
||||
public Builder readPreference(@Nullable ReadPreference readPreference) {
|
||||
|
||||
this.readPreference = readPreference;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the time limit for processing.
|
||||
*
|
||||
@@ -573,6 +632,12 @@ public class AggregationOptions {
|
||||
if (domainTypeMapping != null) {
|
||||
options.domainTypeMapping = domainTypeMapping;
|
||||
}
|
||||
if (readConcern != null) {
|
||||
options.readConcern = Optional.of(readConcern);
|
||||
}
|
||||
if (readPreference != null) {
|
||||
options.readPreference = Optional.of(readPreference);
|
||||
}
|
||||
|
||||
return options;
|
||||
}
|
||||
|
||||
@@ -23,9 +23,10 @@ import java.util.function.Predicate;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
* The {@link AggregationPipeline} holds the collection of {@link AggregationOperation aggregation stages}.
|
||||
* The {@link AggregationPipeline} holds the collection of {@link AggregationStage aggregation stages}.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
@@ -33,12 +34,23 @@ import org.springframework.util.Assert;
|
||||
*/
|
||||
public class AggregationPipeline {
|
||||
|
||||
private final List<AggregationOperation> pipeline;
|
||||
private final List<AggregationStage> pipeline;
|
||||
|
||||
public static AggregationPipeline of(AggregationOperation... stages) {
|
||||
return new AggregationPipeline(Arrays.asList(stages));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link AggregationPipeline} out of the given {@link AggregationStage stages}.
|
||||
*
|
||||
* @param stages the pipeline stages.
|
||||
* @return new instance of {@link AggregationPipeline}.
|
||||
* @since 4.1
|
||||
*/
|
||||
public static AggregationPipeline of(AggregationStage... stages) {
|
||||
return new AggregationPipeline(Arrays.asList(stages));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an empty pipeline
|
||||
*/
|
||||
@@ -49,12 +61,12 @@ public class AggregationPipeline {
|
||||
/**
|
||||
* Create a new pipeline with given {@link AggregationOperation stages}.
|
||||
*
|
||||
* @param aggregationOperations must not be {@literal null}.
|
||||
* @param aggregationStages must not be {@literal null}.
|
||||
*/
|
||||
public AggregationPipeline(List<AggregationOperation> aggregationOperations) {
|
||||
public AggregationPipeline(List<? extends AggregationStage> aggregationStages) {
|
||||
|
||||
Assert.notNull(aggregationOperations, "AggregationOperations must not be null");
|
||||
pipeline = new ArrayList<>(aggregationOperations);
|
||||
Assert.notNull(aggregationStages, "AggregationStages must not be null");
|
||||
pipeline = new ArrayList<>(aggregationStages);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -64,10 +76,21 @@ public class AggregationPipeline {
|
||||
* @return this.
|
||||
*/
|
||||
public AggregationPipeline add(AggregationOperation aggregationOperation) {
|
||||
return add((AggregationStage) aggregationOperation);
|
||||
}
|
||||
|
||||
Assert.notNull(aggregationOperation, "AggregationOperation must not be null");
|
||||
/**
|
||||
* Append the given {@link AggregationOperation stage} to the pipeline.
|
||||
*
|
||||
* @param stage must not be {@literal null}.
|
||||
* @return this.
|
||||
* @since 4.1
|
||||
*/
|
||||
public AggregationPipeline add(AggregationStage stage) {
|
||||
|
||||
pipeline.add(aggregationOperation);
|
||||
Assert.notNull(stage, "AggregationOperation must not be null");
|
||||
|
||||
pipeline.add(stage);
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -76,7 +99,17 @@ public class AggregationPipeline {
|
||||
*
|
||||
* @return never {@literal null}.
|
||||
*/
|
||||
public List<AggregationOperation> getOperations() {
|
||||
public List<AggregationStage> getOperations() {
|
||||
return getStages();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of {@link AggregationOperation aggregation stages}.
|
||||
*
|
||||
* @return never {@literal null}.
|
||||
* @since 4.1
|
||||
*/
|
||||
public List<AggregationStage> getStages() {
|
||||
return Collections.unmodifiableList(pipeline);
|
||||
}
|
||||
|
||||
@@ -95,14 +128,14 @@ public class AggregationPipeline {
|
||||
return false;
|
||||
}
|
||||
|
||||
AggregationOperation operation = pipeline.get(pipeline.size() - 1);
|
||||
AggregationStage operation = pipeline.get(pipeline.size() - 1);
|
||||
return isOut(operation) || isMerge(operation);
|
||||
}
|
||||
|
||||
void verify() {
|
||||
|
||||
// check $out/$merge is the last operation if it exists
|
||||
for (AggregationOperation operation : pipeline) {
|
||||
for (AggregationStage operation : pipeline) {
|
||||
|
||||
if (isOut(operation) && !isLast(operation)) {
|
||||
throw new IllegalArgumentException("The $out operator must be the last stage in the pipeline");
|
||||
@@ -134,13 +167,13 @@ public class AggregationPipeline {
|
||||
return pipeline.isEmpty();
|
||||
}
|
||||
|
||||
private boolean containsOperation(Predicate<AggregationOperation> predicate) {
|
||||
private boolean containsOperation(Predicate<AggregationStage> predicate) {
|
||||
|
||||
if (isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (AggregationOperation element : pipeline) {
|
||||
for (AggregationStage element : pipeline) {
|
||||
if (predicate.test(element)) {
|
||||
return true;
|
||||
}
|
||||
@@ -149,19 +182,29 @@ public class AggregationPipeline {
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isLast(AggregationOperation aggregationOperation) {
|
||||
private boolean isLast(AggregationStage aggregationOperation) {
|
||||
return pipeline.indexOf(aggregationOperation) == pipeline.size() - 1;
|
||||
}
|
||||
|
||||
private static boolean isUnionWith(AggregationOperation operator) {
|
||||
return operator instanceof UnionWithOperation || operator.getOperator().equals("$unionWith");
|
||||
private static boolean isUnionWith(AggregationStage stage) {
|
||||
return isSpecificStage(stage, UnionWithOperation.class, "$unionWith");
|
||||
}
|
||||
|
||||
private static boolean isMerge(AggregationOperation operator) {
|
||||
return operator instanceof MergeOperation || operator.getOperator().equals("$merge");
|
||||
private static boolean isMerge(AggregationStage stage) {
|
||||
return isSpecificStage(stage, MergeOperation.class, "$merge");
|
||||
}
|
||||
|
||||
private static boolean isOut(AggregationOperation operator) {
|
||||
return operator instanceof OutOperation || operator.getOperator().equals("$out");
|
||||
private static boolean isOut(AggregationStage stage) {
|
||||
return isSpecificStage(stage, OutOperation.class, "$out");
|
||||
}
|
||||
|
||||
private static boolean isSpecificStage(AggregationStage stage, Class<?> type, String operator) {
|
||||
if (ClassUtils.isAssignable(type, stage.getClass())) {
|
||||
return true;
|
||||
}
|
||||
if (stage instanceof AggregationOperation operation) {
|
||||
return operation.getOperator().equals(operator);
|
||||
}
|
||||
return stage.toDocument(Aggregation.DEFAULT_CONTEXT).keySet().iterator().next().equals(operator);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.aggregation;
|
||||
|
||||
import org.bson.Document;
|
||||
|
||||
/**
|
||||
* Abstraction for a single
|
||||
* <a href="https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/#stages">Aggregation Pipeline
|
||||
* Stage</a> to be used within an {@link AggregationPipeline}.
|
||||
* <p>
|
||||
* An {@link AggregationStage} may operate upon domain specific types but will render to a ready to use store native
|
||||
* representation within a given {@link AggregationOperationContext context}. The most straight forward way of writing a
|
||||
* custom {@link AggregationStage} is just returning the raw document.
|
||||
*
|
||||
* <pre class="code">
|
||||
* AggregationStage stage = (ctx) -> Document.parse("{ $sort : { borough : 1 } }");
|
||||
* </pre>
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @since 4.1
|
||||
*/
|
||||
public interface AggregationStage {
|
||||
|
||||
/**
|
||||
* Turns the {@link AggregationStage} into a {@link Document} by using the given {@link AggregationOperationContext}.
|
||||
*
|
||||
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
|
||||
* @return the ready to use {@link Document} representing the stage.
|
||||
*/
|
||||
Document toDocument(AggregationOperationContext context);
|
||||
}
|
||||
@@ -25,7 +25,6 @@ import java.util.StringJoiner;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.bson.Document;
|
||||
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.data.mongodb.core.query.SerializationUtils;
|
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition;
|
||||
@@ -71,7 +70,8 @@ import org.springframework.util.Assert;
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @see <a href="https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-aggregation-pipeline">MongoDB
|
||||
* @see <a href=
|
||||
* "https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-aggregation-pipeline">MongoDB
|
||||
* Reference Documentation</a>
|
||||
* @since 3.0
|
||||
*/
|
||||
@@ -92,11 +92,11 @@ public class AggregationUpdate extends Aggregation implements UpdateDefinition {
|
||||
*
|
||||
* @param pipeline must not be {@literal null}.
|
||||
*/
|
||||
protected AggregationUpdate(List<AggregationOperation> pipeline) {
|
||||
protected AggregationUpdate(List<? extends AggregationStage> pipeline) {
|
||||
|
||||
super(pipeline);
|
||||
|
||||
for (AggregationOperation operation : pipeline) {
|
||||
for (AggregationStage operation : pipeline) {
|
||||
if (operation instanceof FieldsExposingAggregationOperation) {
|
||||
((FieldsExposingAggregationOperation) operation).getFields().forEach(it -> {
|
||||
keysTouched.add(it.getName());
|
||||
@@ -123,6 +123,16 @@ public class AggregationUpdate extends Aggregation implements UpdateDefinition {
|
||||
return new AggregationUpdate(pipeline);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new AggregationUpdate from the given {@link AggregationStage stages}.
|
||||
*
|
||||
* @return new instance of {@link AggregationUpdate}.
|
||||
* @since 4.1
|
||||
*/
|
||||
public static AggregationUpdate updateFrom(List<? extends AggregationStage> stages) {
|
||||
return new AggregationUpdate(stages);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds new fields to documents. {@code $set} outputs documents that contain all existing fields from the input
|
||||
* documents and newly added fields.
|
||||
|
||||
@@ -78,6 +78,23 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
|
||||
return new FacetOperationBuilder(facets, Arrays.asList(operations));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link FacetOperationBuilder} to append a new facet using {@literal operations}. <br />
|
||||
* {@link FacetOperationBuilder} takes a pipeline of {@link AggregationStage stages} to categorize documents into a
|
||||
* single facet.
|
||||
*
|
||||
* @param stages must not be {@literal null} or empty.
|
||||
* @return
|
||||
* @since 4.1
|
||||
*/
|
||||
public FacetOperationBuilder and(AggregationStage... stages) {
|
||||
|
||||
Assert.notNull(stages, "Stages must not be null");
|
||||
Assert.notEmpty(stages, "Stages must not be empty");
|
||||
|
||||
return new FacetOperationBuilder(facets, Arrays.asList(stages));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Document toDocument(AggregationOperationContext context) {
|
||||
return new Document(getOperator(), facets.toDocument(context));
|
||||
@@ -102,11 +119,11 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
|
||||
public static class FacetOperationBuilder {
|
||||
|
||||
private final Facets current;
|
||||
private final List<AggregationOperation> operations;
|
||||
private final List<AggregationStage> operations;
|
||||
|
||||
private FacetOperationBuilder(Facets current, List<AggregationOperation> operations) {
|
||||
private FacetOperationBuilder(Facets current, List<? extends AggregationStage> operations) {
|
||||
this.current = current;
|
||||
this.operations = operations;
|
||||
this.operations = new ArrayList<>(operations);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -176,7 +193,7 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
|
||||
* @param operations must not be {@literal null}.
|
||||
* @return the new {@link Facets}.
|
||||
*/
|
||||
Facets and(String fieldName, List<AggregationOperation> operations) {
|
||||
Facets and(String fieldName, List<? extends AggregationStage> operations) {
|
||||
|
||||
Assert.hasText(fieldName, "FieldName must not be null or empty");
|
||||
Assert.notNull(operations, "AggregationOperations must not be null");
|
||||
@@ -197,21 +214,21 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
|
||||
private static class Facet {
|
||||
|
||||
private final ExposedField exposedField;
|
||||
private final List<AggregationOperation> operations;
|
||||
private final List<AggregationStage> stages;
|
||||
|
||||
/**
|
||||
* Creates a new {@link Facet} given {@link ExposedField} and {@link AggregationOperation} pipeline.
|
||||
*
|
||||
* @param exposedField must not be {@literal null}.
|
||||
* @param operations must not be {@literal null}.
|
||||
* @param stages must not be {@literal null}.
|
||||
*/
|
||||
Facet(ExposedField exposedField, List<AggregationOperation> operations) {
|
||||
Facet(ExposedField exposedField, List<? extends AggregationStage> stages) {
|
||||
|
||||
Assert.notNull(exposedField, "ExposedField must not be null");
|
||||
Assert.notNull(operations, "AggregationOperations must not be null");
|
||||
Assert.notNull(stages, "AggregationOperations must not be null");
|
||||
|
||||
this.exposedField = exposedField;
|
||||
this.operations = operations;
|
||||
this.stages = new ArrayList<>(stages);
|
||||
}
|
||||
|
||||
ExposedField getExposedField() {
|
||||
@@ -219,7 +236,7 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
|
||||
}
|
||||
|
||||
protected List<Document> toDocuments(AggregationOperationContext context) {
|
||||
return AggregationOperationRenderer.toDocument(operations, context);
|
||||
return AggregationOperationRenderer.toDocument(stages, context);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,28 +15,44 @@
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.aggregation;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.springframework.data.mongodb.core.aggregation.ExposedFields.ExposedField;
|
||||
import org.springframework.data.mongodb.core.aggregation.FieldsExposingAggregationOperation.InheritsFieldsAggregationOperation;
|
||||
import org.springframework.data.mongodb.core.aggregation.VariableOperators.Let;
|
||||
import org.springframework.data.mongodb.core.aggregation.VariableOperators.Let.ExpressionVariable;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* Encapsulates the aggregation framework {@code $lookup}-operation. We recommend to use the static factory method
|
||||
* {@link Aggregation#lookup(String, String, String, String)} instead of creating instances of this class directly.
|
||||
* Encapsulates the aggregation framework {@code $lookup}-operation. We recommend to use the builder provided via
|
||||
* {@link #newLookup()} instead of creating instances of this class directly.
|
||||
*
|
||||
* @author Alessio Fachechi
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @author Sangyong Choi
|
||||
* @since 1.9
|
||||
* @see <a href="https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/">MongoDB Aggregation Framework:
|
||||
* $lookup</a>
|
||||
*/
|
||||
public class LookupOperation implements FieldsExposingAggregationOperation, InheritsFieldsAggregationOperation {
|
||||
|
||||
private final Field from;
|
||||
private final String from;
|
||||
|
||||
@Nullable //
|
||||
private final Field localField;
|
||||
|
||||
@Nullable //
|
||||
private final Field foreignField;
|
||||
|
||||
@Nullable //
|
||||
private final Let let;
|
||||
|
||||
@Nullable //
|
||||
private final AggregationPipeline pipeline;
|
||||
|
||||
private final ExposedField as;
|
||||
|
||||
/**
|
||||
@@ -48,16 +64,55 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
|
||||
* @param as must not be {@literal null}.
|
||||
*/
|
||||
public LookupOperation(Field from, Field localField, Field foreignField, Field as) {
|
||||
this(((Supplier<String>) () -> {
|
||||
|
||||
Assert.notNull(from, "From must not be null");
|
||||
return from.getTarget();
|
||||
}).get(), localField, foreignField, null, null, as);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link LookupOperation} for the given combination of {@link Field}s and {@link AggregationPipeline
|
||||
* pipeline}.
|
||||
*
|
||||
* @param from must not be {@literal null}.
|
||||
* @param let must not be {@literal null}.
|
||||
* @param as must not be {@literal null}.
|
||||
* @since 4.1
|
||||
*/
|
||||
public LookupOperation(String from, @Nullable Let let, AggregationPipeline pipeline, Field as) {
|
||||
this(from, null, null, let, pipeline, as);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link LookupOperation} for the given combination of {@link Field}s and {@link AggregationPipeline
|
||||
* pipeline}.
|
||||
*
|
||||
* @param from must not be {@literal null}.
|
||||
* @param localField can be {@literal null} if {@literal pipeline} is present.
|
||||
* @param foreignField can be {@literal null} if {@literal pipeline} is present.
|
||||
* @param let can be {@literal null} if {@literal localField} and {@literal foreignField} are present.
|
||||
* @param as must not be {@literal null}.
|
||||
* @since 4.1
|
||||
*/
|
||||
public LookupOperation(String from, @Nullable Field localField, @Nullable Field foreignField, @Nullable Let let,
|
||||
@Nullable AggregationPipeline pipeline, Field as) {
|
||||
|
||||
Assert.notNull(from, "From must not be null");
|
||||
Assert.notNull(localField, "LocalField must not be null");
|
||||
Assert.notNull(foreignField, "ForeignField must not be null");
|
||||
if (pipeline == null) {
|
||||
Assert.notNull(localField, "LocalField must not be null");
|
||||
Assert.notNull(foreignField, "ForeignField must not be null");
|
||||
} else if (localField == null && foreignField == null) {
|
||||
Assert.notNull(pipeline, "Pipeline must not be null");
|
||||
}
|
||||
Assert.notNull(as, "As must not be null");
|
||||
|
||||
this.from = from;
|
||||
this.localField = localField;
|
||||
this.foreignField = foreignField;
|
||||
this.as = new ExposedField(as, true);
|
||||
this.let = let;
|
||||
this.pipeline = pipeline;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -70,9 +125,20 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
|
||||
|
||||
Document lookupObject = new Document();
|
||||
|
||||
lookupObject.append("from", from.getTarget());
|
||||
lookupObject.append("localField", localField.getTarget());
|
||||
lookupObject.append("foreignField", foreignField.getTarget());
|
||||
lookupObject.append("from", from);
|
||||
if (localField != null) {
|
||||
lookupObject.append("localField", localField.getTarget());
|
||||
}
|
||||
if (foreignField != null) {
|
||||
lookupObject.append("foreignField", foreignField.getTarget());
|
||||
}
|
||||
if (let != null) {
|
||||
lookupObject.append("let", let.toDocument(context).get("$let", Document.class).get("vars"));
|
||||
}
|
||||
if (pipeline != null) {
|
||||
lookupObject.append("pipeline", pipeline.toDocuments(context));
|
||||
}
|
||||
|
||||
lookupObject.append("as", as.getTarget());
|
||||
|
||||
return new Document(getOperator(), lookupObject);
|
||||
@@ -101,7 +167,7 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
|
||||
LocalFieldBuilder from(String name);
|
||||
}
|
||||
|
||||
public static interface LocalFieldBuilder {
|
||||
public static interface LocalFieldBuilder extends PipelineBuilder {
|
||||
|
||||
/**
|
||||
* @param name the field from the documents input to the {@code $lookup} stage, must not be {@literal null} or
|
||||
@@ -120,7 +186,78 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
|
||||
AsBuilder foreignField(String name);
|
||||
}
|
||||
|
||||
public static interface AsBuilder {
|
||||
/**
|
||||
* @since 4.1
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
public interface LetBuilder {
|
||||
|
||||
/**
|
||||
* Specifies {@link Let#getVariableNames() variables) that can be used in the
|
||||
* {@link PipelineBuilder#pipeline(AggregationOperation...) pipeline stages}.
|
||||
*
|
||||
* @param let must not be {@literal null}.
|
||||
* @return never {@literal null}.
|
||||
* @see PipelineBuilder
|
||||
*/
|
||||
PipelineBuilder let(Let let);
|
||||
|
||||
/**
|
||||
* Specifies {@link Let#getVariableNames() variables) that can be used in the
|
||||
* {@link PipelineBuilder#pipeline(AggregationOperation...) pipeline stages}.
|
||||
*
|
||||
* @param variables must not be {@literal null}.
|
||||
* @return never {@literal null}.
|
||||
* @see PipelineBuilder
|
||||
*/
|
||||
default PipelineBuilder let(ExpressionVariable... variables) {
|
||||
return let(Let.just(variables));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 4.1
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
public interface PipelineBuilder extends LetBuilder {
|
||||
|
||||
/**
|
||||
* Specifies the {@link AggregationPipeline pipeline} that determines the resulting documents.
|
||||
*
|
||||
* @param pipeline must not be {@literal null}.
|
||||
* @return never {@literal null}.
|
||||
*/
|
||||
AsBuilder pipeline(AggregationPipeline pipeline);
|
||||
|
||||
/**
|
||||
* Specifies the {@link AggregationPipeline#getStages() stages} that determine the resulting documents.
|
||||
*
|
||||
* @param stages must not be {@literal null} can be empty.
|
||||
* @return never {@literal null}.
|
||||
*/
|
||||
default AsBuilder pipeline(AggregationOperation... stages) {
|
||||
return pipeline(AggregationPipeline.of(stages));
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies the {@link AggregationPipeline#getStages() stages} that determine the resulting documents.
|
||||
*
|
||||
* @param stages must not be {@literal null} can be empty.
|
||||
* @return never {@literal null}.
|
||||
* @since 4.1
|
||||
*/
|
||||
default AsBuilder pipeline(AggregationStage... stages) {
|
||||
return pipeline(AggregationPipeline.of(stages));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param name the name of the new array field to add to the input documents, must not be {@literal null} or empty.
|
||||
* @return new instance of {@link LookupOperation}.
|
||||
*/
|
||||
LookupOperation as(String name);
|
||||
}
|
||||
|
||||
public static interface AsBuilder extends PipelineBuilder {
|
||||
|
||||
/**
|
||||
* @param name the name of the new array field to add to the input documents, must not be {@literal null} or empty.
|
||||
@@ -138,10 +275,12 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
|
||||
public static final class LookupOperationBuilder
|
||||
implements FromBuilder, LocalFieldBuilder, ForeignFieldBuilder, AsBuilder {
|
||||
|
||||
private @Nullable Field from;
|
||||
private @Nullable String from;
|
||||
private @Nullable Field localField;
|
||||
private @Nullable Field foreignField;
|
||||
private @Nullable ExposedField as;
|
||||
private @Nullable Let let;
|
||||
private @Nullable AggregationPipeline pipeline;
|
||||
|
||||
/**
|
||||
* Creates new builder for {@link LookupOperation}.
|
||||
@@ -156,18 +295,10 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
|
||||
public LocalFieldBuilder from(String name) {
|
||||
|
||||
Assert.hasText(name, "'From' must not be null or empty");
|
||||
from = Fields.field(name);
|
||||
from = name;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LookupOperation as(String name) {
|
||||
|
||||
Assert.hasText(name, "'As' must not be null or empty");
|
||||
as = new ExposedField(Fields.field(name), true);
|
||||
return new LookupOperation(from, localField, foreignField, as);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsBuilder foreignField(String name) {
|
||||
|
||||
@@ -183,5 +314,29 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
|
||||
localField = Fields.field(name);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PipelineBuilder let(Let let) {
|
||||
|
||||
Assert.notNull(let, "Let must not be null");
|
||||
this.let = let;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsBuilder pipeline(AggregationPipeline pipeline) {
|
||||
|
||||
Assert.notNull(pipeline, "Pipeline must not be null");
|
||||
this.pipeline = pipeline;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LookupOperation as(String name) {
|
||||
|
||||
Assert.hasText(name, "'As' must not be null or empty");
|
||||
as = new ExposedField(Fields.field(name), true);
|
||||
return new LookupOperation(from, localField, foreignField, let, pipeline, as);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,103 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.aggregation;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
|
||||
/**
|
||||
* An {@link AggregationStage} that may consist of a main operation and potential follow up stages for eg. {@code $sort}
|
||||
* or {@code $limit}.
|
||||
* <p>
|
||||
* The {@link MultiOperationAggregationStage} may operate upon domain specific types but will render to the store native
|
||||
* representation within a given {@link AggregationOperationContext context}.
|
||||
* <p>
|
||||
* {@link #toDocument(AggregationOperationContext)} will render a synthetic {@link Document} that contains the ordered
|
||||
* stages. The list returned from {@link #toPipelineStages(AggregationOperationContext)}
|
||||
*
|
||||
* <pre class="code">
|
||||
* [
|
||||
* { $match: { $text: { $search: "operating" } } },
|
||||
* { $sort: { score: { $meta: "textScore" }, posts: -1 } }
|
||||
* ]
|
||||
* </pre>
|
||||
*
|
||||
* will be represented as
|
||||
*
|
||||
* <pre class="code">
|
||||
* {
|
||||
* $match: { $text: { $search: "operating" } },
|
||||
* $sort: { score: { $meta: "textScore" }, posts: -1 }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* In case stages appear multiple times the order no longer can be guaranteed when calling
|
||||
* {@link #toDocument(AggregationOperationContext)}, so consumers of the API should rely on
|
||||
* {@link #toPipelineStages(AggregationOperationContext)}. Nevertheless, by default the values will be collected into a
|
||||
* list rendering to
|
||||
*
|
||||
* <pre class="code">
|
||||
* {
|
||||
* $match: [{ $text: { $search: "operating" } }, { $text: ... }],
|
||||
* $sort: { score: { $meta: "textScore" }, posts: -1 }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @since 4.1
|
||||
*/
|
||||
public interface MultiOperationAggregationStage extends AggregationStage {
|
||||
|
||||
/**
|
||||
* Returns a synthetic {@link Document stage} that contains the {@link #toPipelineStages(AggregationOperationContext)
|
||||
* actual stages} by folding them into a single {@link Document}. In case of colliding entries, those used multiple
|
||||
* times thus having the same key, the entries will be held as a list for the given operator.
|
||||
*
|
||||
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
|
||||
* @return never {@literal null}.
|
||||
*/
|
||||
@Override
|
||||
default Document toDocument(AggregationOperationContext context) {
|
||||
|
||||
List<Document> documents = toPipelineStages(context);
|
||||
if (documents.size() == 1) {
|
||||
return documents.get(0);
|
||||
}
|
||||
|
||||
MultiValueMap<String, Document> stages = new LinkedMultiValueMap<>(documents.size());
|
||||
for (Document current : documents) {
|
||||
String key = current.keySet().iterator().next();
|
||||
stages.add(key, current.get(key, Document.class));
|
||||
}
|
||||
return new Document(stages.entrySet().stream()
|
||||
.collect(Collectors.toMap(Entry::getKey, v -> v.getValue().size() == 1 ? v.getValue().get(0) : v.getValue())));
|
||||
}
|
||||
|
||||
/**
|
||||
* Turns the {@link MultiOperationAggregationStage} into list of {@link Document stages} by using the given
|
||||
* {@link AggregationOperationContext}. This allows an {@link AggregationStage} to add follow up stages for eg.
|
||||
* {@code $sort} or {@code $limit}.
|
||||
*
|
||||
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
|
||||
* @return the pipeline stages to run through. Never {@literal null}.
|
||||
*/
|
||||
List<Document> toPipelineStages(AggregationOperationContext context);
|
||||
}
|
||||
@@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.aggregation;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.util.Assert;
|
||||
@@ -24,6 +25,7 @@ import org.springframework.util.Assert;
|
||||
*
|
||||
* @author Thomas Darimont
|
||||
* @author Oliver Gierke
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
public class TypedAggregation<I> extends Aggregation {
|
||||
|
||||
@@ -39,13 +41,24 @@ public class TypedAggregation<I> extends Aggregation {
|
||||
this(inputType, asAggregationList(operations));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link TypedAggregation} from the given {@link AggregationStage stages}.
|
||||
*
|
||||
* @param inputType must not be {@literal null}.
|
||||
* @param stages must not be {@literal null} or empty.
|
||||
* @since 4.1
|
||||
*/
|
||||
public TypedAggregation(Class<I> inputType, AggregationStage... stages) {
|
||||
this(inputType, Arrays.asList(stages));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link TypedAggregation} from the given {@link AggregationOperation}s.
|
||||
*
|
||||
* @param inputType must not be {@literal null}.
|
||||
* @param operations must not be {@literal null} or empty.
|
||||
*/
|
||||
public TypedAggregation(Class<I> inputType, List<AggregationOperation> operations) {
|
||||
public TypedAggregation(Class<I> inputType, List<? extends AggregationStage> operations) {
|
||||
this(inputType, operations, DEFAULT_OPTIONS);
|
||||
}
|
||||
|
||||
@@ -57,7 +70,7 @@ public class TypedAggregation<I> extends Aggregation {
|
||||
* @param operations must not be {@literal null} or empty.
|
||||
* @param options must not be {@literal null}.
|
||||
*/
|
||||
public TypedAggregation(Class<I> inputType, List<AggregationOperation> operations, AggregationOptions options) {
|
||||
public TypedAggregation(Class<I> inputType, List<? extends AggregationStage> operations, AggregationOptions options) {
|
||||
|
||||
super(operations, options);
|
||||
|
||||
@@ -77,6 +90,6 @@ public class TypedAggregation<I> extends Aggregation {
|
||||
public TypedAggregation<I> withOptions(AggregationOptions options) {
|
||||
|
||||
Assert.notNull(options, "AggregationOptions must not be null");
|
||||
return new TypedAggregation<I>(inputType, pipeline.getOperations(), options);
|
||||
return new TypedAggregation<>(inputType, pipeline.getStages(), options);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
package org.springframework.data.mongodb.core.aggregation;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@@ -224,28 +223,41 @@ public class VariableOperators {
|
||||
public static class Let implements AggregationExpression {
|
||||
|
||||
private final List<ExpressionVariable> vars;
|
||||
|
||||
@Nullable //
|
||||
private final AggregationExpression expression;
|
||||
|
||||
private Let(List<ExpressionVariable> vars, AggregationExpression expression) {
|
||||
private Let(List<ExpressionVariable> vars, @Nullable AggregationExpression expression) {
|
||||
|
||||
this.vars = vars;
|
||||
this.expression = expression;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link Let} holding just the given {@literal variables}.
|
||||
*
|
||||
* @param variables must not be {@literal null}.
|
||||
* @return new instance of {@link Let}.
|
||||
* @since 4.1
|
||||
*/
|
||||
public static Let just(ExpressionVariable... variables) {
|
||||
return new Let(List.of(variables), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start creating new {@link Let} by defining the variables for {@code $vars}.
|
||||
*
|
||||
* @param variables must not be {@literal null}.
|
||||
* @return
|
||||
*/
|
||||
public static LetBuilder define(final Collection<ExpressionVariable> variables) {
|
||||
public static LetBuilder define(Collection<ExpressionVariable> variables) {
|
||||
|
||||
Assert.notNull(variables, "Variables must not be null");
|
||||
|
||||
return new LetBuilder() {
|
||||
|
||||
@Override
|
||||
public Let andApply(final AggregationExpression expression) {
|
||||
public Let andApply(AggregationExpression expression) {
|
||||
|
||||
Assert.notNull(expression, "Expression must not be null");
|
||||
return new Let(new ArrayList<ExpressionVariable>(variables), expression);
|
||||
@@ -259,19 +271,10 @@ public class VariableOperators {
|
||||
* @param variables must not be {@literal null}.
|
||||
* @return
|
||||
*/
|
||||
public static LetBuilder define(final ExpressionVariable... variables) {
|
||||
public static LetBuilder define(ExpressionVariable... variables) {
|
||||
|
||||
Assert.notNull(variables, "Variables must not be null");
|
||||
|
||||
return new LetBuilder() {
|
||||
|
||||
@Override
|
||||
public Let andApply(final AggregationExpression expression) {
|
||||
|
||||
Assert.notNull(expression, "Expression must not be null");
|
||||
return new Let(Arrays.asList(variables), expression);
|
||||
}
|
||||
};
|
||||
return define(List.of(variables));
|
||||
}
|
||||
|
||||
public interface LetBuilder {
|
||||
@@ -283,10 +286,11 @@ public class VariableOperators {
|
||||
* @return
|
||||
*/
|
||||
Let andApply(AggregationExpression expression);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Document toDocument(final AggregationOperationContext context) {
|
||||
public Document toDocument(AggregationOperationContext context) {
|
||||
return toLet(ExposedFields.synthetic(Fields.fields(getVariableNames())), context);
|
||||
}
|
||||
|
||||
@@ -312,16 +316,22 @@ public class VariableOperators {
|
||||
}
|
||||
|
||||
letExpression.put("vars", mappedVars);
|
||||
letExpression.put("in", getMappedIn(operationContext));
|
||||
if (expression != null) {
|
||||
letExpression.put("in", getMappedIn(operationContext));
|
||||
}
|
||||
|
||||
return new Document("$let", letExpression);
|
||||
}
|
||||
|
||||
private Document getMappedVariable(ExpressionVariable var, AggregationOperationContext context) {
|
||||
|
||||
return new Document(var.variableName,
|
||||
var.expression instanceof AggregationExpression ? ((AggregationExpression) var.expression).toDocument(context)
|
||||
: var.expression);
|
||||
if (var.expression instanceof AggregationExpression expression) {
|
||||
return new Document(var.variableName, expression.toDocument(context));
|
||||
}
|
||||
if (var.expression instanceof Field field) {
|
||||
return new Document(var.variableName, context.getReference(field).toString());
|
||||
}
|
||||
return new Document(var.variableName, var.expression);
|
||||
}
|
||||
|
||||
private Object getMappedIn(AggregationOperationContext context) {
|
||||
@@ -373,6 +383,10 @@ public class VariableOperators {
|
||||
return new ExpressionVariable(variableName, expression);
|
||||
}
|
||||
|
||||
public ExpressionVariable forField(String fieldRef) {
|
||||
return new ExpressionVariable(variableName, Fields.field(fieldRef));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link ExpressionVariable} with current name and given {@literal expressionObject}.
|
||||
*
|
||||
|
||||
@@ -24,11 +24,16 @@ import org.springframework.data.geo.Distance;
|
||||
import org.springframework.data.geo.Metric;
|
||||
import org.springframework.data.geo.Metrics;
|
||||
import org.springframework.data.geo.Point;
|
||||
import org.springframework.data.mongodb.core.ReadConcernAware;
|
||||
import org.springframework.data.mongodb.core.ReadPreferenceAware;
|
||||
import org.springframework.data.mongodb.core.geo.GeoJsonPoint;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
import com.mongodb.ReadConcern;
|
||||
import com.mongodb.ReadPreference;
|
||||
|
||||
/**
|
||||
* Builder class to build near-queries. <br />
|
||||
* MongoDB {@code $geoNear} operator allows usage of a {@literal GeoJSON Point} or legacy coordinate pair. Though
|
||||
@@ -171,7 +176,7 @@ import org.springframework.util.ObjectUtils;
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
*/
|
||||
public final class NearQuery {
|
||||
public final class NearQuery implements ReadConcernAware, ReadPreferenceAware {
|
||||
|
||||
private final Point point;
|
||||
private @Nullable Query query;
|
||||
@@ -181,6 +186,8 @@ public final class NearQuery {
|
||||
private boolean spherical;
|
||||
private @Nullable Long limit;
|
||||
private @Nullable Long skip;
|
||||
private @Nullable ReadConcern readConcern;
|
||||
private @Nullable ReadPreference readPreference;
|
||||
|
||||
/**
|
||||
* Creates a new {@link NearQuery}.
|
||||
@@ -555,6 +562,74 @@ public final class NearQuery {
|
||||
return query != null ? query.getCollation().orElse(null) : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures the query to use the given {@link ReadConcern} unless the underlying {@link #query(Query)}
|
||||
* {@link Query#hasReadConcern() specifies} another one.
|
||||
*
|
||||
* @param readConcern must not be {@literal null}.
|
||||
* @return this.
|
||||
* @since 4.1
|
||||
*/
|
||||
public NearQuery withReadConcern(ReadConcern readConcern) {
|
||||
|
||||
Assert.notNull(readConcern, "ReadConcern must not be null");
|
||||
this.readConcern = readConcern;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures the query to use the given {@link ReadPreference} unless the underlying {@link #query(Query)}
|
||||
* {@link Query#hasReadPreference() specifies} another one.
|
||||
*
|
||||
* @param readPreference must not be {@literal null}.
|
||||
* @return this.
|
||||
* @since 4.1
|
||||
*/
|
||||
public NearQuery withReadPreference(ReadPreference readPreference) {
|
||||
|
||||
Assert.notNull(readPreference, "ReadPreference must not be null");
|
||||
this.readPreference = readPreference;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the {@link ReadConcern} to use. Will return the underlying {@link #query(Query) queries}
|
||||
* {@link Query#getReadConcern() ReadConcern} if present or the one defined on the {@link NearQuery#readConcern}
|
||||
* itself.
|
||||
*
|
||||
* @return can be {@literal null} if none set.
|
||||
* @since 4.1
|
||||
* @see ReadConcernAware
|
||||
*/
|
||||
@Nullable
|
||||
@Override
|
||||
public ReadConcern getReadConcern() {
|
||||
|
||||
if (query != null && query.hasReadConcern()) {
|
||||
return query.getReadConcern();
|
||||
}
|
||||
return readConcern;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the {@link ReadPreference} to use. Will return the underlying {@link #query(Query) queries}
|
||||
* {@link Query#getReadPreference() ReadPreference} if present or the one defined on the
|
||||
* {@link NearQuery#readPreference} itself.
|
||||
*
|
||||
* @return can be {@literal null} if none set.
|
||||
* @since 4.1
|
||||
* @see ReadPreferenceAware
|
||||
*/
|
||||
@Nullable
|
||||
@Override
|
||||
public ReadPreference getReadPreference() {
|
||||
|
||||
if (query != null && query.hasReadPreference()) {
|
||||
return query.getReadPreference();
|
||||
}
|
||||
return readPreference;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link Document} built by the {@link NearQuery}.
|
||||
*
|
||||
|
||||
@@ -34,10 +34,16 @@ import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.domain.Sort;
|
||||
import org.springframework.data.domain.Sort.Order;
|
||||
import org.springframework.data.mongodb.InvalidMongoDbApiUsageException;
|
||||
import org.springframework.data.mongodb.core.ReadConcernAware;
|
||||
import org.springframework.data.mongodb.core.ReadPreferenceAware;
|
||||
import org.springframework.data.mongodb.core.query.Meta.CursorOption;
|
||||
import org.springframework.data.mongodb.util.BsonUtils;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import com.mongodb.ReadConcern;
|
||||
import com.mongodb.ReadPreference;
|
||||
|
||||
/**
|
||||
* MongoDB Query object representing criteria, projection, sorting and query hints.
|
||||
*
|
||||
@@ -48,7 +54,7 @@ import org.springframework.util.Assert;
|
||||
* @author Mark Paluch
|
||||
* @author Anton Barkan
|
||||
*/
|
||||
public class Query {
|
||||
public class Query implements ReadConcernAware, ReadPreferenceAware {
|
||||
|
||||
private static final String RESTRICTED_TYPES_KEY = "_$RESTRICTED_TYPES";
|
||||
|
||||
@@ -58,6 +64,9 @@ public class Query {
|
||||
private Sort sort = Sort.unsorted();
|
||||
private long skip;
|
||||
private int limit;
|
||||
private @Nullable ReadConcern readConcern;
|
||||
private @Nullable ReadPreference readPreference;
|
||||
|
||||
private @Nullable String hint;
|
||||
|
||||
private Meta meta = new Meta();
|
||||
@@ -160,6 +169,59 @@ public class Query {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures the query to use the given {@link ReadConcern} when being executed.
|
||||
*
|
||||
* @param readConcern must not be {@literal null}.
|
||||
* @return this.
|
||||
* @since 3.1
|
||||
*/
|
||||
public Query withReadConcern(ReadConcern readConcern) {
|
||||
|
||||
Assert.notNull(readConcern, "ReadConcern must not be null");
|
||||
this.readConcern = readConcern;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures the query to use the given {@link ReadPreference} when being executed.
|
||||
*
|
||||
* @param readPreference must not be {@literal null}.
|
||||
* @return this.
|
||||
* @since 4.1
|
||||
*/
|
||||
public Query withReadPreference(ReadPreference readPreference) {
|
||||
|
||||
Assert.notNull(readPreference, "ReadPreference must not be null");
|
||||
this.readPreference = readPreference;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasReadConcern() {
|
||||
return this.readConcern != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadConcern getReadConcern() {
|
||||
return this.readConcern;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasReadPreference() {
|
||||
return this.readPreference != null || getMeta().getFlags().contains(CursorOption.SECONDARY_READS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadPreference getReadPreference() {
|
||||
|
||||
if (readPreference == null) {
|
||||
return getMeta().getFlags().contains(CursorOption.SECONDARY_READS) ? ReadPreference.primaryPreferred() : null;
|
||||
}
|
||||
|
||||
return this.readPreference;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configures the query to use the given {@link Document hint} when being executed.
|
||||
*
|
||||
|
||||
@@ -27,6 +27,7 @@ import org.springframework.data.domain.Sort.Order;
|
||||
import org.springframework.data.mongodb.core.aggregation.Aggregation;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
|
||||
import org.springframework.data.mongodb.core.convert.MongoConverter;
|
||||
import org.springframework.data.mongodb.core.query.Collation;
|
||||
import org.springframework.data.mongodb.core.query.Meta;
|
||||
@@ -109,14 +110,14 @@ abstract class AggregationUtils {
|
||||
* @param accessor
|
||||
* @param targetType
|
||||
*/
|
||||
static void appendSortIfPresent(List<AggregationOperation> aggregationPipeline, ConvertingParameterAccessor accessor,
|
||||
static void appendSortIfPresent(List<? extends AggregationStage> aggregationPipeline, ConvertingParameterAccessor accessor,
|
||||
Class<?> targetType) {
|
||||
|
||||
if (accessor.getSort().isUnsorted()) {
|
||||
return;
|
||||
}
|
||||
|
||||
aggregationPipeline.add(ctx -> {
|
||||
((List<AggregationStage>) aggregationPipeline).add(ctx -> {
|
||||
|
||||
Document sort = new Document();
|
||||
for (Order order : accessor.getSort()) {
|
||||
@@ -134,7 +135,7 @@ abstract class AggregationUtils {
|
||||
* @param aggregationPipeline
|
||||
* @param accessor
|
||||
*/
|
||||
static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregationPipeline,
|
||||
static void appendLimitAndOffsetIfPresent(List<? extends AggregationStage> aggregationPipeline,
|
||||
ConvertingParameterAccessor accessor) {
|
||||
appendLimitAndOffsetIfPresent(aggregationPipeline, accessor, LongUnaryOperator.identity(),
|
||||
IntUnaryOperator.identity());
|
||||
@@ -150,7 +151,7 @@ abstract class AggregationUtils {
|
||||
* @param limitOperator
|
||||
* @since 3.3
|
||||
*/
|
||||
static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregationPipeline,
|
||||
static void appendLimitAndOffsetIfPresent(List<? extends AggregationStage> aggregationPipeline,
|
||||
ConvertingParameterAccessor accessor, LongUnaryOperator offsetOperator, IntUnaryOperator limitOperator) {
|
||||
|
||||
Pageable pageable = accessor.getPageable();
|
||||
@@ -159,10 +160,10 @@ abstract class AggregationUtils {
|
||||
}
|
||||
|
||||
if (pageable.getOffset() > 0) {
|
||||
aggregationPipeline.add(Aggregation.skip(offsetOperator.applyAsLong(pageable.getOffset())));
|
||||
((List<AggregationStage>) aggregationPipeline).add(Aggregation.skip(offsetOperator.applyAsLong(pageable.getOffset())));
|
||||
}
|
||||
|
||||
aggregationPipeline.add(Aggregation.limit(limitOperator.applyAsInt(pageable.getPageSize())));
|
||||
((List<AggregationStage>) aggregationPipeline).add(Aggregation.limit(limitOperator.applyAsInt(pageable.getPageSize())));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -64,6 +64,7 @@ class QueryUtils {
|
||||
combinedSort.putAll((Document) invocation.proceed());
|
||||
return combinedSort;
|
||||
});
|
||||
factory.setInterfaces(new Class[0]);
|
||||
|
||||
return (Query) factory.getProxy(query.getClass().getClassLoader());
|
||||
}
|
||||
@@ -113,7 +114,7 @@ class QueryUtils {
|
||||
if(parameters.isEmpty()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
int i = 0;
|
||||
for(Class<?> parameterType : parameters) {
|
||||
if(ClassUtils.isAssignable(type, parameterType)) {
|
||||
|
||||
@@ -108,6 +108,7 @@ import org.springframework.util.CollectionUtils;
|
||||
import com.mongodb.MongoClientSettings;
|
||||
import com.mongodb.MongoException;
|
||||
import com.mongodb.MongoNamespace;
|
||||
import com.mongodb.ReadConcern;
|
||||
import com.mongodb.ReadPreference;
|
||||
import com.mongodb.ServerAddress;
|
||||
import com.mongodb.ServerCursor;
|
||||
@@ -120,16 +121,7 @@ import com.mongodb.client.MongoClient;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.MongoCursor;
|
||||
import com.mongodb.client.MongoDatabase;
|
||||
import com.mongodb.client.model.CountOptions;
|
||||
import com.mongodb.client.model.CreateCollectionOptions;
|
||||
import com.mongodb.client.model.DeleteOptions;
|
||||
import com.mongodb.client.model.FindOneAndDeleteOptions;
|
||||
import com.mongodb.client.model.FindOneAndReplaceOptions;
|
||||
import com.mongodb.client.model.FindOneAndUpdateOptions;
|
||||
import com.mongodb.client.model.MapReduceAction;
|
||||
import com.mongodb.client.model.ReplaceOptions;
|
||||
import com.mongodb.client.model.TimeSeriesGranularity;
|
||||
import com.mongodb.client.model.UpdateOptions;
|
||||
import com.mongodb.client.model.*;
|
||||
import com.mongodb.client.result.DeleteResult;
|
||||
import com.mongodb.client.result.UpdateResult;
|
||||
|
||||
@@ -182,6 +174,7 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
|
||||
when(collection.estimatedDocumentCount(any())).thenReturn(1L);
|
||||
when(collection.getNamespace()).thenReturn(new MongoNamespace("db.mock-collection"));
|
||||
when(collection.aggregate(any(List.class), any())).thenReturn(aggregateIterable);
|
||||
when(collection.withReadConcern(any())).thenReturn(collection);
|
||||
when(collection.withReadPreference(any())).thenReturn(collection);
|
||||
when(collection.replaceOne(any(), any(), any(ReplaceOptions.class))).thenReturn(updateResult);
|
||||
when(collection.withWriteConcern(any())).thenReturn(collectionWithWriteConcern);
|
||||
@@ -478,6 +471,34 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
|
||||
verify(collection, never()).withReadPreference(any());
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void aggregateShouldHonorOptionsReadConcernWhenSet() {
|
||||
|
||||
AggregationOptions options = AggregationOptions.builder().readConcern(ReadConcern.SNAPSHOT).build();
|
||||
template.aggregate(newAggregation(Aggregation.unwind("foo")).withOptions(options), "collection-1", Wrapper.class);
|
||||
|
||||
verify(collection).withReadConcern(ReadConcern.SNAPSHOT);
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void aggregateShouldHonorOptionsReadPreferenceWhenSet() {
|
||||
|
||||
AggregationOptions options = AggregationOptions.builder().readPreference(ReadPreference.secondary()).build();
|
||||
template.aggregate(newAggregation(Aggregation.unwind("foo")).withOptions(options), "collection-1", Wrapper.class);
|
||||
|
||||
verify(collection).withReadPreference(ReadPreference.secondary());
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void aggregateStreamShouldHonorOptionsReadPreferenceWhenSet() {
|
||||
|
||||
AggregationOptions options = AggregationOptions.builder().readPreference(ReadPreference.secondary()).build();
|
||||
template.aggregateStream(newAggregation(Aggregation.unwind("foo")).withOptions(options), "collection-1",
|
||||
Wrapper.class);
|
||||
|
||||
verify(collection).withReadPreference(ReadPreference.secondary());
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2153
|
||||
void aggregateShouldHonorOptionsComment() {
|
||||
|
||||
@@ -558,6 +579,28 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
|
||||
verify(collection).withReadPreference(eq(ReadPreference.secondary()));
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void geoNearShouldHonorReadPreferenceFromQuery() {
|
||||
|
||||
NearQuery query = NearQuery.near(new Point(1, 1));
|
||||
query.withReadPreference(ReadPreference.secondary());
|
||||
|
||||
template.geoNear(query, Wrapper.class);
|
||||
|
||||
verify(collection).withReadPreference(eq(ReadPreference.secondary()));
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void geoNearShouldHonorReadConcernFromQuery() {
|
||||
|
||||
NearQuery query = NearQuery.near(new Point(1, 1));
|
||||
query.withReadConcern(ReadConcern.SNAPSHOT);
|
||||
|
||||
template.geoNear(query, Wrapper.class);
|
||||
|
||||
verify(collection).withReadConcern(eq(ReadConcern.SNAPSHOT));
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1166, DATAMONGO-2264
|
||||
void geoNearShouldIgnoreReadPreferenceWhenNotSet() {
|
||||
|
||||
@@ -802,6 +845,24 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
|
||||
verify(findIterable).batchSize(1234);
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void findShouldUseReadConcernWhenPresent() {
|
||||
|
||||
template.find(new BasicQuery("{'foo' : 'bar'}").withReadConcern(ReadConcern.SNAPSHOT),
|
||||
AutogenerateableId.class);
|
||||
|
||||
verify(collection).withReadConcern(ReadConcern.SNAPSHOT);
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void findShouldUseReadPreferenceWhenPresent() {
|
||||
|
||||
template.find(new BasicQuery("{'foo' : 'bar'}").withReadPreference(ReadPreference.secondary()),
|
||||
AutogenerateableId.class);
|
||||
|
||||
verify(collection).withReadPreference(ReadPreference.secondary());
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1518
|
||||
void executeQueryShouldUseCollationWhenPresent() {
|
||||
|
||||
@@ -1048,7 +1109,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
|
||||
@Test // DATAMONGO-1733
|
||||
void appliesFieldsWhenInterfaceProjectionIsClosedAndQueryDoesNotDefineFields() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document(), Person.class, PersonProjection.class,
|
||||
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document(), Person.class,
|
||||
PersonProjection.class,
|
||||
CursorPreparer.NO_OP_PREPARER);
|
||||
|
||||
verify(findIterable).projection(eq(new Document("firstname", 1)));
|
||||
@@ -1057,7 +1119,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
|
||||
@Test // DATAMONGO-1733
|
||||
void doesNotApplyFieldsWhenInterfaceProjectionIsClosedAndQueryDefinesFields() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document("bar", 1), Person.class, PersonProjection.class,
|
||||
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document("bar", 1), Person.class,
|
||||
PersonProjection.class,
|
||||
CursorPreparer.NO_OP_PREPARER);
|
||||
|
||||
verify(findIterable).projection(eq(new Document("bar", 1)));
|
||||
@@ -1066,7 +1129,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
|
||||
@Test // DATAMONGO-1733
|
||||
void doesNotApplyFieldsWhenInterfaceProjectionIsOpen() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document(), Person.class, PersonSpELProjection.class,
|
||||
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document(), Person.class,
|
||||
PersonSpELProjection.class,
|
||||
CursorPreparer.NO_OP_PREPARER);
|
||||
|
||||
verify(findIterable).projection(eq(BsonUtils.EMPTY_DOCUMENT));
|
||||
@@ -1075,7 +1139,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
|
||||
@Test // DATAMONGO-1733, DATAMONGO-2041
|
||||
void appliesFieldsToDtoProjection() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document(), Person.class, Jedi.class,
|
||||
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document(), Person.class,
|
||||
Jedi.class,
|
||||
CursorPreparer.NO_OP_PREPARER);
|
||||
|
||||
verify(findIterable).projection(eq(new Document("firstname", 1)));
|
||||
@@ -1084,7 +1149,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
|
||||
@Test // DATAMONGO-1733
|
||||
void doesNotApplyFieldsToDtoProjectionWhenQueryDefinesFields() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document("bar", 1), Person.class, Jedi.class,
|
||||
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document("bar", 1), Person.class,
|
||||
Jedi.class,
|
||||
CursorPreparer.NO_OP_PREPARER);
|
||||
|
||||
verify(findIterable).projection(eq(new Document("bar", 1)));
|
||||
@@ -1093,7 +1159,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
|
||||
@Test // DATAMONGO-1733
|
||||
void doesNotApplyFieldsWhenTargetIsNotAProjection() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document(), Person.class, Person.class,
|
||||
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document(), Person.class,
|
||||
Person.class,
|
||||
CursorPreparer.NO_OP_PREPARER);
|
||||
|
||||
verify(findIterable).projection(eq(BsonUtils.EMPTY_DOCUMENT));
|
||||
@@ -1102,7 +1169,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
|
||||
@Test // DATAMONGO-1733
|
||||
void doesNotApplyFieldsWhenTargetExtendsDomainType() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document(), Person.class, PersonExtended.class,
|
||||
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document(), Person.class,
|
||||
PersonExtended.class,
|
||||
CursorPreparer.NO_OP_PREPARER);
|
||||
|
||||
verify(findIterable).projection(eq(BsonUtils.EMPTY_DOCUMENT));
|
||||
|
||||
@@ -23,8 +23,6 @@ import static org.springframework.data.mongodb.test.util.Assertions.assertThat;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.springframework.data.mongodb.core.MongoTemplateUnitTests.Wrapper;
|
||||
import org.springframework.data.mongodb.core.aggregation.Aggregation;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
@@ -56,12 +54,12 @@ import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.reactivestreams.Subscriber;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.support.StaticApplicationContext;
|
||||
import org.springframework.data.annotation.Id;
|
||||
import org.springframework.data.geo.Point;
|
||||
import org.springframework.data.mapping.MappingException;
|
||||
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
|
||||
import org.springframework.data.mapping.context.MappingContext;
|
||||
@@ -100,6 +98,7 @@ import org.springframework.test.util.ReflectionTestUtils;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import com.mongodb.MongoClientSettings;
|
||||
import com.mongodb.ReadConcern;
|
||||
import com.mongodb.ReadPreference;
|
||||
import com.mongodb.client.model.CountOptions;
|
||||
import com.mongodb.client.model.CreateCollectionOptions;
|
||||
@@ -168,6 +167,7 @@ public class ReactiveMongoTemplateUnitTests {
|
||||
when(db.runCommand(any(), any(Class.class))).thenReturn(runCommandPublisher);
|
||||
when(db.createCollection(any(), any(CreateCollectionOptions.class))).thenReturn(runCommandPublisher);
|
||||
when(collection.withReadPreference(any())).thenReturn(collection);
|
||||
when(collection.withReadConcern(any())).thenReturn(collection);
|
||||
when(collection.find(any(Class.class))).thenReturn(findPublisher);
|
||||
when(collection.find(any(Document.class), any(Class.class))).thenReturn(findPublisher);
|
||||
when(collection.aggregate(anyList())).thenReturn(aggregatePublisher);
|
||||
@@ -385,11 +385,33 @@ public class ReactiveMongoTemplateUnitTests {
|
||||
verify(aggregatePublisher).collation(eq(com.mongodb.client.model.Collation.builder().locale("fr").build()));
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void geoNearShouldHonorReadPreferenceFromQuery() {
|
||||
|
||||
NearQuery query = NearQuery.near(new Point(1, 1));
|
||||
query.withReadPreference(ReadPreference.secondary());
|
||||
|
||||
template.geoNear(query, Wrapper.class).subscribe();
|
||||
|
||||
verify(collection).withReadPreference(eq(ReadPreference.secondary()));
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void geoNearShouldHonorReadConcernFromQuery() {
|
||||
|
||||
NearQuery query = NearQuery.near(new Point(1, 1));
|
||||
query.withReadConcern(ReadConcern.SNAPSHOT);
|
||||
|
||||
template.geoNear(query, Wrapper.class).subscribe();
|
||||
|
||||
verify(collection).withReadConcern(eq(ReadConcern.SNAPSHOT));
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1719
|
||||
void appliesFieldsWhenInterfaceProjectionIsClosedAndQueryDoesNotDefineFields() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document(), Person.class, PersonProjection.class,
|
||||
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document(), Person.class,
|
||||
PersonProjection.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
|
||||
verify(findPublisher).projection(eq(new Document("firstname", 1)));
|
||||
}
|
||||
@@ -397,8 +419,8 @@ public class ReactiveMongoTemplateUnitTests {
|
||||
@Test // DATAMONGO-1719
|
||||
void doesNotApplyFieldsWhenInterfaceProjectionIsClosedAndQueryDefinesFields() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document("bar", 1), Person.class, PersonProjection.class,
|
||||
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document("bar", 1), Person.class,
|
||||
PersonProjection.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
|
||||
verify(findPublisher).projection(eq(new Document("bar", 1)));
|
||||
}
|
||||
@@ -406,8 +428,8 @@ public class ReactiveMongoTemplateUnitTests {
|
||||
@Test // DATAMONGO-1719
|
||||
void doesNotApplyFieldsWhenInterfaceProjectionIsOpen() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document(), Person.class, PersonSpELProjection.class,
|
||||
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document(), Person.class,
|
||||
PersonSpELProjection.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
|
||||
verify(findPublisher, never()).projection(any());
|
||||
}
|
||||
@@ -415,8 +437,8 @@ public class ReactiveMongoTemplateUnitTests {
|
||||
@Test // DATAMONGO-1719, DATAMONGO-2041
|
||||
void appliesFieldsToDtoProjection() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document(), Person.class, Jedi.class,
|
||||
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document(), Person.class,
|
||||
Jedi.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
|
||||
verify(findPublisher).projection(eq(new Document("firstname", 1)));
|
||||
}
|
||||
@@ -424,8 +446,8 @@ public class ReactiveMongoTemplateUnitTests {
|
||||
@Test // DATAMONGO-1719
|
||||
void doesNotApplyFieldsToDtoProjectionWhenQueryDefinesFields() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document("bar", 1), Person.class, Jedi.class,
|
||||
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document("bar", 1), Person.class,
|
||||
Jedi.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
|
||||
verify(findPublisher).projection(eq(new Document("bar", 1)));
|
||||
}
|
||||
@@ -433,8 +455,8 @@ public class ReactiveMongoTemplateUnitTests {
|
||||
@Test // DATAMONGO-1719
|
||||
void doesNotApplyFieldsWhenTargetIsNotAProjection() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document(), Person.class, Person.class,
|
||||
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document(), Person.class,
|
||||
Person.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
|
||||
verify(findPublisher, never()).projection(any());
|
||||
}
|
||||
@@ -442,8 +464,8 @@ public class ReactiveMongoTemplateUnitTests {
|
||||
@Test // DATAMONGO-1719
|
||||
void doesNotApplyFieldsWhenTargetExtendsDomainType() {
|
||||
|
||||
template.doFind("star-wars", new Document(), new Document(), Person.class, PersonExtended.class,
|
||||
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document(), Person.class,
|
||||
PersonExtended.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
|
||||
|
||||
verify(findPublisher, never()).projection(any());
|
||||
}
|
||||
@@ -632,6 +654,26 @@ public class ReactiveMongoTemplateUnitTests {
|
||||
verify(aggregatePublisher).collation(eq(com.mongodb.client.model.Collation.builder().locale("de_AT").build()));
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void aggreateShouldUseReadConcern() {
|
||||
|
||||
AggregationOptions options = AggregationOptions.builder().readConcern(ReadConcern.SNAPSHOT).build();
|
||||
template.aggregate(newAggregation(Sith.class, project("id")).withOptions(options), AutogenerateableId.class,
|
||||
Document.class).subscribe();
|
||||
|
||||
verify(collection).withReadConcern(ReadConcern.SNAPSHOT);
|
||||
}
|
||||
|
||||
@Test // GH-4286
|
||||
void aggreateShouldUseReadReadPreference() {
|
||||
|
||||
AggregationOptions options = AggregationOptions.builder().readPreference(ReadPreference.primaryPreferred()).build();
|
||||
template.aggregate(newAggregation(Sith.class, project("id")).withOptions(options), AutogenerateableId.class,
|
||||
Document.class).subscribe();
|
||||
|
||||
verify(collection).withReadPreference(ReadPreference.primaryPreferred());
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1854
|
||||
void aggreateShouldUseCollationFromOptionsEvenIfDefaultCollationIsPresent() {
|
||||
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.aggregation;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
/**
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
class AggregationOperationUnitTests {
|
||||
|
||||
@Test // GH-4306
|
||||
void getOperatorShouldFavorToPipelineStages() {
|
||||
|
||||
AggregationOperation op = new AggregationOperation() {
|
||||
|
||||
@Override
|
||||
public Document toDocument(AggregationOperationContext context) {
|
||||
return new Document();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Document> toPipelineStages(AggregationOperationContext context) {
|
||||
return List.of(new Document("$spring", "data"));
|
||||
}
|
||||
};
|
||||
|
||||
assertThat(op.getOperator()).isEqualTo("$spring");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.aggregation;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
/**
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
class AggregationPipelineUnitTests {
|
||||
|
||||
@Test // Gh-4306
|
||||
void verifyMustNotFailIfOnlyPipelineStagesUsed() {
|
||||
AggregationPipeline.of(new OverridesPipelineStagesAndThrowsErrorOnToDocument()).verify();
|
||||
}
|
||||
|
||||
static class OverridesPipelineStagesAndThrowsErrorOnToDocument implements AggregationOperation {
|
||||
|
||||
@Override
|
||||
public Document toDocument(AggregationOperationContext context) {
|
||||
throw new IllegalStateException("oh no");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Document> toPipelineStages(AggregationOperationContext context) {
|
||||
return List.of(Aggregation.project("data").toDocument(context));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -43,7 +43,6 @@ import java.util.stream.Stream;
|
||||
|
||||
import org.assertj.core.data.Offset;
|
||||
import org.bson.Document;
|
||||
import org.bson.types.ObjectId;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -61,6 +60,7 @@ import org.springframework.data.mongodb.core.TestEntities;
|
||||
import org.springframework.data.mongodb.core.Venue;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationTests.CarDescriptor.Entry;
|
||||
import org.springframework.data.mongodb.core.aggregation.BucketAutoOperation.Granularities;
|
||||
import org.springframework.data.mongodb.core.aggregation.VariableOperators.Let;
|
||||
import org.springframework.data.mongodb.core.aggregation.VariableOperators.Let.ExpressionVariable;
|
||||
import org.springframework.data.mongodb.core.geo.GeoJsonPoint;
|
||||
import org.springframework.data.mongodb.core.index.GeoSpatialIndexType;
|
||||
@@ -90,6 +90,7 @@ import com.mongodb.client.MongoCollection;
|
||||
* @author Maninder Singh
|
||||
* @author Sergey Shcherbakov
|
||||
* @author Minsu Kim
|
||||
* @author Sangyong Choi
|
||||
*/
|
||||
@ExtendWith(MongoTemplateExtension.class)
|
||||
public class AggregationTests {
|
||||
@@ -499,7 +500,7 @@ public class AggregationTests {
|
||||
/*
|
||||
//complex mongodb aggregation framework example from
|
||||
https://docs.mongodb.org/manual/tutorial/aggregation-examples/#largest-and-smallest-cities-by-state
|
||||
|
||||
|
||||
db.zipcodes.aggregate(
|
||||
{
|
||||
$group: {
|
||||
@@ -1518,8 +1519,47 @@ public class AggregationTests {
|
||||
assertThat(firstItem).containsEntry("linkedPerson.[0].firstname", "u1");
|
||||
}
|
||||
|
||||
@Test // GH-3322
|
||||
@EnableIfMongoServerVersion(isGreaterThanEqual = "5.0")
|
||||
void shouldLookupPeopleCorrectlyWithPipeline() {
|
||||
createUsersWithReferencedPersons();
|
||||
|
||||
TypedAggregation<User> agg = newAggregation(User.class, //
|
||||
lookup().from("person").localField("_id").foreignField("firstname").pipeline(match(where("firstname").is("u1"))).as("linkedPerson"), //
|
||||
sort(ASC, "id"));
|
||||
|
||||
AggregationResults<Document> results = mongoTemplate.aggregate(agg, User.class, Document.class);
|
||||
|
||||
List<Document> mappedResults = results.getMappedResults();
|
||||
|
||||
Document firstItem = mappedResults.get(0);
|
||||
|
||||
assertThat(firstItem).containsEntry("_id", "u1");
|
||||
assertThat(firstItem).containsEntry("linkedPerson.[0].firstname", "u1");
|
||||
}
|
||||
|
||||
@Test // GH-3322
|
||||
@EnableIfMongoServerVersion(isGreaterThanEqual = "5.0")
|
||||
void shouldLookupPeopleCorrectlyWithPipelineAndLet() {
|
||||
createUsersWithReferencedPersons();
|
||||
|
||||
TypedAggregation<User> agg = newAggregation(User.class, //
|
||||
lookup().from("person").localField("_id").foreignField("firstname").let(Let.ExpressionVariable.newVariable("the_id").forField("_id")).pipeline(
|
||||
match(ctx -> new Document("$expr", new Document("$eq", List.of("$$the_id", "u1"))))).as("linkedPerson"),
|
||||
sort(ASC, "id"));
|
||||
|
||||
AggregationResults<Document> results = mongoTemplate.aggregate(agg, User.class, Document.class);
|
||||
|
||||
List<Document> mappedResults = results.getMappedResults();
|
||||
|
||||
Document firstItem = mappedResults.get(0);
|
||||
|
||||
assertThat(firstItem).containsEntry("_id", "u1");
|
||||
assertThat(firstItem).containsEntry("linkedPerson.[0].firstname", "u1");
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1326
|
||||
void shouldGroupByAndLookupPeopleCorectly() {
|
||||
void shouldGroupByAndLookupPeopleCorrectly() {
|
||||
|
||||
createUsersWithReferencedPersons();
|
||||
|
||||
|
||||
@@ -16,11 +16,16 @@
|
||||
package org.springframework.data.mongodb.core.aggregation;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;
|
||||
import static org.springframework.data.mongodb.core.aggregation.VariableOperators.Let.ExpressionVariable.*;
|
||||
import static org.springframework.data.mongodb.test.util.Assertions.assertThat;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.data.mongodb.core.DocumentTestUtils;
|
||||
import org.springframework.data.mongodb.core.query.Criteria;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link LookupOperation}.
|
||||
@@ -62,7 +67,7 @@ public class LookupOperationUnitTests {
|
||||
|
||||
Document lookupClause = extractDocumentFromLookupOperation(lookupOperation);
|
||||
|
||||
assertThat(lookupClause).containsEntry("from", "a") //
|
||||
org.assertj.core.api.Assertions.assertThat(lookupClause).containsEntry("from", "a") //
|
||||
.containsEntry("localField", "b") //
|
||||
.containsEntry("foreignField", "c") //
|
||||
.containsEntry("as", "d");
|
||||
@@ -114,7 +119,7 @@ public class LookupOperationUnitTests {
|
||||
|
||||
Document lookupClause = extractDocumentFromLookupOperation(lookupOperation);
|
||||
|
||||
assertThat(lookupClause).containsEntry("from", "a") //
|
||||
org.assertj.core.api.Assertions.assertThat(lookupClause).containsEntry("from", "a") //
|
||||
.containsEntry("localField", "b") //
|
||||
.containsEntry("foreignField", "c") //
|
||||
.containsEntry("as", "d");
|
||||
@@ -129,4 +134,86 @@ public class LookupOperationUnitTests {
|
||||
assertThat(lookupOperation.getFields().exposesSingleFieldOnly()).isTrue();
|
||||
assertThat(lookupOperation.getFields().getField("d")).isNotNull();
|
||||
}
|
||||
|
||||
@Test // GH-3322
|
||||
void buildsLookupWithLetAndPipeline() {
|
||||
|
||||
LookupOperation lookupOperation = LookupOperation.newLookup().from("warehouses")
|
||||
.let(newVariable("order_item").forField("item"), newVariable("order_qty").forField("ordered"))
|
||||
.pipeline(match(ctx -> new Document("$expr",
|
||||
new Document("$and", List.of(Document.parse("{ $eq: [ \"$stock_item\", \"$$order_item\" ] }"),
|
||||
Document.parse("{ $gte: [ \"$instock\", \"$$order_qty\" ] }"))))))
|
||||
.as("stockdata");
|
||||
|
||||
assertThat(lookupOperation.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
|
||||
{ $lookup: {
|
||||
from: "warehouses",
|
||||
let: { order_item: "$item", order_qty: "$ordered" },
|
||||
pipeline: [
|
||||
{ $match:
|
||||
{ $expr:
|
||||
{ $and:
|
||||
[
|
||||
{ $eq: [ "$stock_item", "$$order_item" ] },
|
||||
{ $gte: [ "$instock", "$$order_qty" ] }
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
as: "stockdata"
|
||||
}}
|
||||
""");
|
||||
}
|
||||
|
||||
@Test // GH-3322
|
||||
void buildsLookupWithJustPipeline() {
|
||||
|
||||
LookupOperation lookupOperation = LookupOperation.newLookup().from("holidays") //
|
||||
.pipeline( //
|
||||
match(Criteria.where("year").is(2018)), //
|
||||
project().andExclude("_id").and(ctx -> new Document("name", "$name").append("date", "$date")).as("date"), //
|
||||
Aggregation.replaceRoot("date") //
|
||||
).as("holidays");
|
||||
|
||||
assertThat(lookupOperation.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
|
||||
{ $lookup:
|
||||
{
|
||||
from: "holidays",
|
||||
pipeline: [
|
||||
{ $match: { year: 2018 } },
|
||||
{ $project: { _id: 0, date: { name: "$name", date: "$date" } } },
|
||||
{ $replaceRoot: { newRoot: "$date" } }
|
||||
],
|
||||
as: "holidays"
|
||||
}
|
||||
}}
|
||||
""");
|
||||
}
|
||||
|
||||
@Test // GH-3322
|
||||
void buildsLookupWithLocalAndForeignFieldAsWellAsLetAndPipeline() {
|
||||
|
||||
LookupOperation lookupOperation = Aggregation.lookup().from("restaurants") //
|
||||
.localField("restaurant_name")
|
||||
.foreignField("name")
|
||||
.let(newVariable("orders_drink").forField("drink")) //
|
||||
.pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
|
||||
.as("matches");
|
||||
|
||||
assertThat(lookupOperation.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
|
||||
{ $lookup: {
|
||||
from: "restaurants",
|
||||
localField: "restaurant_name",
|
||||
foreignField: "name",
|
||||
let: { orders_drink: "$drink" },
|
||||
pipeline: [{
|
||||
$match: {
|
||||
$expr: { $in: [ "$$orders_drink", "$beverages" ] }
|
||||
}
|
||||
}],
|
||||
as: "matches"
|
||||
}}
|
||||
""");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.aggregation;
|
||||
|
||||
import static org.springframework.data.mongodb.test.util.Assertions.*;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
/**
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
class MultiOperationAggregationStageUnitTests {
|
||||
|
||||
@Test // GH-4306
|
||||
void toDocumentRendersSingleOperation() {
|
||||
|
||||
MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"));
|
||||
|
||||
assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("{ $text: { $search: 'operating' } }");
|
||||
}
|
||||
|
||||
@Test // GH-4306
|
||||
void toDocumentRendersMultiOperation() {
|
||||
|
||||
MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"),
|
||||
Document.parse("{ $sort: { score: { $meta: 'textScore' } } }"));
|
||||
|
||||
assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
|
||||
{
|
||||
$text: { $search: 'operating' },
|
||||
$sort: { score: { $meta: 'textScore' } }
|
||||
}
|
||||
""");
|
||||
}
|
||||
|
||||
@Test // GH-4306
|
||||
void toDocumentCollectsDuplicateOperation() {
|
||||
|
||||
MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"),
|
||||
Document.parse("{ $sort: { score: { $meta: 'textScore' } } }"), Document.parse("{ $sort: { posts: -1 } }"));
|
||||
|
||||
assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
|
||||
{
|
||||
$text: { $search: 'operating' },
|
||||
$sort: [
|
||||
{ score: { $meta: 'textScore' } },
|
||||
{ posts: -1 }
|
||||
]
|
||||
}
|
||||
""");
|
||||
}
|
||||
}
|
||||
@@ -21,9 +21,6 @@ import static org.springframework.data.mongodb.core.messaging.SubscriptionUtils.
|
||||
import static org.springframework.data.mongodb.core.query.Criteria.*;
|
||||
import static org.springframework.data.mongodb.core.query.Query.*;
|
||||
|
||||
import com.mongodb.client.model.ChangeStreamPreAndPostImagesOptions;
|
||||
import com.mongodb.client.model.CreateCollectionOptions;
|
||||
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
@@ -42,8 +39,10 @@ import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junitpioneer.jupiter.RepeatFailedTest;
|
||||
import org.springframework.data.annotation.Id;
|
||||
import org.springframework.data.mongodb.core.ChangeStreamOptions;
|
||||
import org.springframework.data.mongodb.core.CollectionOptions;
|
||||
@@ -62,7 +61,7 @@ import org.springframework.data.mongodb.test.util.Template;
|
||||
|
||||
import com.mongodb.client.model.changestream.ChangeStreamDocument;
|
||||
import com.mongodb.client.model.changestream.FullDocument;
|
||||
import org.junitpioneer.jupiter.RepeatFailedTest;
|
||||
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
|
||||
|
||||
/**
|
||||
* Integration test for subscribing to a {@link com.mongodb.operation.ChangeStreamBatchCursor} inside the
|
||||
@@ -702,7 +701,7 @@ class ChangeStreamTests {
|
||||
}
|
||||
|
||||
@Test // GH-4187
|
||||
@EnableIfMongoServerVersion(isLessThan = "6.0")
|
||||
@Disabled("Flakey test failing occasionally due to timing issues")
|
||||
void readsFullDocumentBeforeChangeWhenOptionDeclaredRequiredAndMongoVersionIsLessThan6() throws InterruptedException {
|
||||
|
||||
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
|
||||
|
||||
@@ -29,6 +29,10 @@ import org.springframework.data.geo.Metrics;
|
||||
import org.springframework.data.geo.Point;
|
||||
import org.springframework.data.mongodb.core.DocumentTestUtils;
|
||||
import org.springframework.data.mongodb.core.geo.GeoJsonPoint;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
|
||||
import com.mongodb.ReadConcern;
|
||||
import com.mongodb.ReadPreference;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link NearQuery}.
|
||||
@@ -229,4 +233,58 @@ public class NearQueryUnitTests {
|
||||
|
||||
assertThat(query.toDocument()).containsEntry("maxDistance", 1000D).containsEntry("distanceMultiplier", 0.00062137D);
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void fetchesReadPreferenceFromUnderlyingQueryObject() {
|
||||
|
||||
NearQuery nearQuery = NearQuery.near(new Point(0, 0))
|
||||
.query(new Query().withReadPreference(ReadPreference.nearest()));
|
||||
|
||||
assertThat(nearQuery.getReadPreference()).isEqualTo(ReadPreference.nearest());
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void fetchesReadConcernFromUnderlyingQueryObject() {
|
||||
|
||||
NearQuery nearQuery = NearQuery.near(new Point(0, 0)).query(new Query().withReadConcern(ReadConcern.SNAPSHOT));
|
||||
|
||||
assertThat(nearQuery.getReadConcern()).isEqualTo(ReadConcern.SNAPSHOT);
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void usesReadPreferenceFromNearQueryIfUnderlyingQueryDoesNotDefineAny() {
|
||||
|
||||
NearQuery nearQuery = NearQuery.near(new Point(0, 0)).withReadPreference(ReadPreference.nearest())
|
||||
.query(new Query());
|
||||
|
||||
assertThat(((Query) ReflectionTestUtils.getField(nearQuery, "query")).getReadPreference()).isNull();
|
||||
assertThat(nearQuery.getReadPreference()).isEqualTo(ReadPreference.nearest());
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void usesReadConcernFromNearQueryIfUnderlyingQueryDoesNotDefineAny() {
|
||||
|
||||
NearQuery nearQuery = NearQuery.near(new Point(0, 0)).withReadConcern(ReadConcern.SNAPSHOT).query(new Query());
|
||||
|
||||
assertThat(((Query) ReflectionTestUtils.getField(nearQuery, "query")).getReadConcern()).isNull();
|
||||
assertThat(nearQuery.getReadConcern()).isEqualTo(ReadConcern.SNAPSHOT);
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void readPreferenceFromUnderlyingQueryOverridesNearQueryOne() {
|
||||
|
||||
NearQuery nearQuery = NearQuery.near(new Point(0, 0)).withReadPreference(ReadPreference.nearest())
|
||||
.query(new Query().withReadPreference(ReadPreference.primary()));
|
||||
|
||||
assertThat(nearQuery.getReadPreference()).isEqualTo(ReadPreference.primary());
|
||||
}
|
||||
|
||||
@Test // GH-4277
|
||||
void readConcernFromUnderlyingQueryOverridesNearQueryOne() {
|
||||
|
||||
NearQuery nearQuery = NearQuery.near(new Point(0, 0)).withReadConcern(ReadConcern.SNAPSHOT)
|
||||
.query(new Query().withReadConcern(ReadConcern.MAJORITY));
|
||||
|
||||
assertThat(nearQuery.getReadConcern()).isEqualTo(ReadConcern.MAJORITY);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -342,7 +342,10 @@ class QueryTests {
|
||||
source.limit(10);
|
||||
source.setSortObject(new Document("_id", 1));
|
||||
|
||||
Query target = Query.of((Query) new ProxyFactory(source).getProxy());
|
||||
ProxyFactory proxyFactory = new ProxyFactory(source);
|
||||
proxyFactory.setInterfaces(new Class[0]);
|
||||
|
||||
Query target = Query.of((Query) proxyFactory.getProxy());
|
||||
|
||||
compareQueries(target, source);
|
||||
}
|
||||
|
||||
@@ -28,7 +28,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import org.springframework.data.domain.Example;
|
||||
import org.springframework.data.domain.Sort;
|
||||
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
|
||||
@@ -120,7 +119,11 @@ class SimpleReactiveMongoRepositoryUnitTests {
|
||||
@Test // DATAMONGO-1854
|
||||
void shouldAddDefaultCollationToFindOneForExampleIfPresent() {
|
||||
|
||||
when(mongoOperations.find(any(), any(), any())).thenReturn(flux);
|
||||
when(entityInformation.getCollectionName()).thenReturn("testdummy");
|
||||
doReturn(flux).when(mongoOperations).find(any(Query.class), eq(TestDummy.class), eq("testdummy"));
|
||||
when(flux.buffer(anyInt())).thenReturn(flux);
|
||||
when(flux.map(any())).thenReturn(flux);
|
||||
when(flux.next()).thenReturn(mono);
|
||||
|
||||
Collation collation = Collation.of("en_US");
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
Spring Data MongoDB 4.0 GA (2022.0.0)
|
||||
Spring Data MongoDB 4.1 M2 (2023.0.0)
|
||||
Copyright (c) [2010-2019] Pivotal Software, Inc.
|
||||
|
||||
This product is licensed to you under the Apache License, Version 2.0 (the "License").
|
||||
@@ -39,6 +39,8 @@ conditions of the subcomponent's license, as noted in the LICENSE file.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user