Compare commits

...

23 Commits

Author SHA1 Message Date
Christoph Strobl
38e406867a Introduce AggregationStage API
With the introduction of AggregationStage we move the API closer to the MongoDB terminology removing kognitive overhead.
Also the change allows us to switch back and forth with the default implementations of toDocument and toDocuments which let's us remove the deprecation warnings having dedicated interfaces that indicate what to implement in order to comply with the usage pattern.
2023-03-01 10:13:47 +01:00
Christoph Strobl
5305c9542b Favor pipelineStages over toDocument in AggregationOperation.
This commit makes sure to use available pipeline stages to figure out the operation.
2023-02-27 10:27:47 +01:00
Christoph Strobl
c02968d00a Prepare issue branch. 2023-02-27 09:41:43 +01:00
Mark Paluch
3ab78fc1ed Upgrade to Maven Wrapper 3.9.0.
See #4297
2023-02-20 11:58:01 +01:00
Christoph Strobl
fa0f026410 After release cleanups.
See #4294
2023-02-17 14:25:48 +01:00
Christoph Strobl
9c96a2b2c3 Prepare next development iteration.
See #4294
2023-02-17 14:25:46 +01:00
Christoph Strobl
0986210221 Release version 4.1 M2 (2023.0.0).
See #4294
2023-02-17 14:22:30 +01:00
Christoph Strobl
7d5372f049 Prepare 4.1 M2 (2023.0.0).
See #4294
2023-02-17 14:22:15 +01:00
Christoph Strobl
a5022e9bc4 After release cleanups.
See #4235
2023-02-17 13:31:54 +01:00
Christoph Strobl
aff8fbd62a Prepare next development iteration.
See #4235
2023-02-17 13:31:52 +01:00
Christoph Strobl
633fbceb5a Release version 4.1 M1 (2023.0.0).
See #4235
2023-02-17 13:27:49 +01:00
Christoph Strobl
fb9a0d8482 Prepare 4.1 M1 (2023.0.0).
See #4235
2023-02-17 13:27:08 +01:00
Christoph Strobl
d73807df1b Support ReadConcern and ReadPreference via NearQuery.
Implement ReadConcernAware and ReadPreferenceAware for NearQuery and make sure those get applied when working with the template API.

Original Pull Request: #4288
2023-02-16 14:28:11 +01:00
Mark Paluch
e56f6ce87f Polishing.
Documentation, refine parameter ordering.

Original Pull Request: #4288
2023-02-16 14:28:11 +01:00
Mark Paluch
c5c6fc107c Support ReadConcern & ReadPreference via the Query and Aggregation API.
Add support for setting the ReadConcern and ReadPreference via the Query and Aggregation API.

Closes: #4277, #4286
Original Pull Request: #4288
2023-02-16 14:28:10 +01:00
Christoph Strobl
368c644922 Guard tests for $lookup with let & pipeline
Add guard to skip tests prior to 5.0 server version.

Related to: #3322
2023-02-16 09:33:37 +01:00
Christoph Strobl
4d050f5021 Polishing.
Reuse Let from VariableOperators.
Limit API exposure and favor builders.
Update nullability constraints and assertions.
Update integration tests.
Add unit tests.

Original Pull Request: #4272
2023-02-16 08:33:07 +01:00
sangyongchoi
83923e0e2a Add support for 'let' and 'pipeline' in $lookup
This commit introduces let and pipline to the Lookup aggregation stage.

Closes: #3322
Original Pull Request: #4272
2023-02-16 08:30:21 +01:00
Mark Paluch
25588850dd Disable flakey test.
See #4290
2023-02-14 11:25:06 +01:00
Mark Paluch
55c81f4f54 Adopt to Mockito 5.1 changes.
Closes #4290
2023-02-14 10:50:30 +01:00
Christoph Strobl
ac7551e47f Upgrade to MongoDB driver 4.9.0
Closes: #4289
2023-02-14 07:51:11 +01:00
Mark Paluch
6d3043de9a Update CI properties.
See #4235
2023-01-30 10:49:50 +01:00
Mark Paluch
1a94b6e4ee Upgrade to Maven Wrapper 3.8.7.
See #4281
2023-01-30 10:48:12 +01:00
44 changed files with 1958 additions and 364 deletions

View File

@@ -1,2 +1,2 @@
#Fri Jun 03 09:32:40 CEST 2022
distributionUrl=https\://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.8.5/apache-maven-3.8.5-bin.zip
#Mon Feb 20 11:58:01 CET 2023
distributionUrl=https\://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.0/apache-maven-3.9.0-bin.zip

View File

@@ -1,16 +1,16 @@
# Java versions
java.main.tag=17.0.5_8-jdk-focal
java.main.tag=17.0.6_10-jdk-focal
# Docker container images - standard
docker.java.main.image=harbor-repo.vmware.com/dockerhub-proxy-cache/library/eclipse-temurin:${java.main.tag}
# Supported versions of MongoDB
docker.mongodb.4.4.version=4.4.17
docker.mongodb.5.0.version=5.0.13
docker.mongodb.6.0.version=6.0.2
docker.mongodb.4.4.version=4.4.18
docker.mongodb.5.0.version=5.0.14
docker.mongodb.6.0.version=6.0.4
# Supported versions of Redis
docker.redis.6.version=6.2.6
docker.redis.6.version=6.2.10
# Supported versions of Cassandra
docker.cassandra.3.version=3.11.14

View File

@@ -5,7 +5,7 @@
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.x-GH-4306-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Spring Data MongoDB</name>
@@ -27,7 +27,7 @@
<project.type>multi</project.type>
<dist.id>spring-data-mongodb</dist.id>
<springdata.commons>3.1.0-SNAPSHOT</springdata.commons>
<mongo>4.8.2</mongo>
<mongo>4.9.0</mongo>
<mongo.reactivestreams>${mongo}</mongo.reactivestreams>
<jmh.version>1.19</jmh.version>
</properties>

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.x-GH-4306-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -15,7 +15,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.x-GH-4306-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -13,7 +13,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<version>4.1.x-GH-4306-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -0,0 +1,61 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import org.springframework.util.Assert;
import com.mongodb.client.MongoCollection;
/**
* Interface for functional preparation of a {@link MongoCollection}.
*
* @author Mark Paluch
* @since 4.1
*/
public interface CollectionPreparer<T> {
/**
* Returns a preparer that always returns its input collection.
*
* @return a preparer that always returns its input collection.
*/
static <T> CollectionPreparer<T> identity() {
return it -> it;
}
/**
* Prepare the {@code collection}.
*
* @param collection the collection to prepare.
* @return the prepared collection.
*/
T prepare(T collection);
/**
* Returns a composed {@code CollectionPreparer} that first applies this preparer to the collection, and then applies
* the {@code after} preparer to the result. If evaluation of either function throws an exception, it is relayed to
* the caller of the composed function.
*
* @param after the collection preparer to apply after this function is applied.
* @return a composed {@code CollectionPreparer} that first applies this preparer and then applies the {@code after}
* preparer.
*/
default CollectionPreparer<T> andThen(CollectionPreparer<T> after) {
Assert.notNull(after, "After CollectionPreparer must not be null");
return c -> after.prepare(prepare(c));
}
}

View File

@@ -0,0 +1,182 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import java.util.Arrays;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.bson.Document;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoCollection;
/**
* Support class for delegate implementations to apply {@link ReadConcern} and {@link ReadPreference} settings upon
* {@link CollectionPreparer preparing a collection}.
*
* @author Mark Paluch
* @since 4.1
*/
class CollectionPreparerSupport implements ReadConcernAware, ReadPreferenceAware {
private final List<Object> sources;
private CollectionPreparerSupport(List<Object> sources) {
this.sources = sources;
}
<T> T doPrepare(T collection, Function<T, ReadConcern> concernAccessor, BiFunction<T, ReadConcern, T> concernFunction,
Function<T, ReadPreference> preferenceAccessor, BiFunction<T, ReadPreference, T> preferenceFunction) {
T collectionToUse = collection;
for (Object source : sources) {
if (source instanceof ReadConcernAware rca && rca.hasReadConcern()) {
ReadConcern concern = rca.getReadConcern();
if (concernAccessor.apply(collectionToUse) != concern) {
collectionToUse = concernFunction.apply(collectionToUse, concern);
}
break;
}
}
for (Object source : sources) {
if (source instanceof ReadPreferenceAware rpa && rpa.hasReadPreference()) {
ReadPreference preference = rpa.getReadPreference();
if (preferenceAccessor.apply(collectionToUse) != preference) {
collectionToUse = preferenceFunction.apply(collectionToUse, preference);
}
break;
}
}
return collectionToUse;
}
@Override
public boolean hasReadConcern() {
for (Object aware : sources) {
if (aware instanceof ReadConcernAware rca && rca.hasReadConcern()) {
return true;
}
}
return false;
}
@Override
public ReadConcern getReadConcern() {
for (Object aware : sources) {
if (aware instanceof ReadConcernAware rca && rca.hasReadConcern()) {
return rca.getReadConcern();
}
}
return null;
}
@Override
public boolean hasReadPreference() {
for (Object aware : sources) {
if (aware instanceof ReadPreferenceAware rpa && rpa.hasReadPreference()) {
return true;
}
}
return false;
}
@Override
public ReadPreference getReadPreference() {
for (Object aware : sources) {
if (aware instanceof ReadPreferenceAware rpa && rpa.hasReadPreference()) {
return rpa.getReadPreference();
}
}
return null;
}
static class CollectionPreparerDelegate extends CollectionPreparerSupport
implements CollectionPreparer<MongoCollection<Document>> {
private CollectionPreparerDelegate(List<Object> sources) {
super(sources);
}
public static CollectionPreparerDelegate of(ReadPreferenceAware... awares) {
return of((Object[]) awares);
}
public static CollectionPreparerDelegate of(Object... mixedAwares) {
if (mixedAwares.length == 1 && mixedAwares[0] instanceof CollectionPreparerDelegate) {
return (CollectionPreparerDelegate) mixedAwares[0];
}
return new CollectionPreparerDelegate(Arrays.asList(mixedAwares));
}
@Override
public MongoCollection<Document> prepare(MongoCollection<Document> collection) {
return doPrepare(collection, MongoCollection::getReadConcern, MongoCollection::withReadConcern,
MongoCollection::getReadPreference, MongoCollection::withReadPreference);
}
}
static class ReactiveCollectionPreparerDelegate extends CollectionPreparerSupport
implements CollectionPreparer<com.mongodb.reactivestreams.client.MongoCollection<Document>> {
private ReactiveCollectionPreparerDelegate(List<Object> sources) {
super(sources);
}
public static ReactiveCollectionPreparerDelegate of(ReadPreferenceAware... awares) {
return of((Object[]) awares);
}
public static ReactiveCollectionPreparerDelegate of(Object... mixedAwares) {
if (mixedAwares.length == 1 && mixedAwares[0] instanceof CollectionPreparerDelegate) {
return (ReactiveCollectionPreparerDelegate) mixedAwares[0];
}
return new ReactiveCollectionPreparerDelegate(Arrays.asList(mixedAwares));
}
@Override
public com.mongodb.reactivestreams.client.MongoCollection<Document> prepare(
com.mongodb.reactivestreams.client.MongoCollection<Document> collection) {
return doPrepare(collection, //
com.mongodb.reactivestreams.client.MongoCollection::getReadConcern,
com.mongodb.reactivestreams.client.MongoCollection::withReadConcern,
com.mongodb.reactivestreams.client.MongoCollection::getReadPreference,
com.mongodb.reactivestreams.client.MongoCollection::withReadPreference);
}
}
}

View File

@@ -20,7 +20,6 @@ import java.util.Optional;
import java.util.stream.Stream;
import org.bson.Document;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
@@ -168,7 +167,8 @@ class ExecutableFindOperationSupport implements ExecutableFindOperation {
Document queryObject = query.getQueryObject();
Document fieldsObject = query.getFieldsObject();
return template.doFind(getCollectionName(), queryObject, fieldsObject, domainType, returnType,
return template.doFind(template.createDelegate(query), getCollectionName(), queryObject, fieldsObject, domainType,
returnType,
getCursorPreparer(query, preparer));
}

View File

@@ -30,6 +30,7 @@ import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
@@ -296,6 +297,19 @@ public interface MongoOperations extends FluentMongoOperations {
return createView(name, source, AggregationPipeline.of(stages));
}
/**
* Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
* on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
*
* @param name the name of the view to create.
* @param source the type defining the views source collection.
* @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
* @since 4.1
*/
default MongoCollection<Document> createView(String name, Class<?> source, AggregationStage... stages) {
return createView(name, source, AggregationPipeline.of(stages));
}
/**
* Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
* another collection or view identified by the given {@link #getCollectionName(Class) source type}.

View File

@@ -55,6 +55,7 @@ import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.MongoDatabaseUtils;
import org.springframework.data.mongodb.SessionSynchronization;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.CollectionPreparerSupport.CollectionPreparerDelegate;
import org.springframework.data.mongodb.core.DefaultBulkOperations.BulkOperationContext;
import org.springframework.data.mongodb.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.mongodb.core.QueryOperations.AggregationDefinition;
@@ -66,6 +67,7 @@ import org.springframework.data.mongodb.core.QueryOperations.UpdateContext;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions.Builder;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
@@ -83,7 +85,6 @@ import org.springframework.data.mongodb.core.mapreduce.MapReduceResults;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Meta;
import org.springframework.data.mongodb.core.query.Meta.CursorOption;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
@@ -112,7 +113,23 @@ import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
/**
* Primary implementation of {@link MongoOperations}.
* Primary implementation of {@link MongoOperations}. It simplifies the use of imperative MongoDB usage and helps to
* avoid common errors. It executes core MongoDB workflow, leaving application code to provide {@link Document} and
* extract results. This class executes BSON queries or updates, initiating iteration over {@link FindIterable} and
* catching MongoDB exceptions and translating them to the generic, more informative exception hierarchy defined in the
* org.springframework.dao package. Can be used within a service implementation via direct instantiation with a
* {@link MongoDatabaseFactory} reference, or get prepared in an application context and given to services as bean
* reference.
* <p>
* Note: The {@link MongoDatabaseFactory} should always be configured as a bean in the application context, in the first
* case given to the service directly, in the second case to the prepared template.
* <h3>{@link ReadPreference} and {@link com.mongodb.ReadConcern}</h3>
* <p>
* {@code ReadPreference} and {@code ReadConcern} are generally considered from {@link Query} and
* {@link AggregationOptions} objects for the action to be executed on a particular {@link MongoCollection}.
* <p>
* You can also set the default {@link #setReadPreference(ReadPreference) ReadPreference} on the template level to
* generally apply a {@link ReadPreference}.
*
* @author Thomas Risberg
* @author Graeme Rocher
@@ -141,7 +158,8 @@ import com.mongodb.client.result.UpdateResult;
* @author Bartłomiej Mazur
* @author Michael Krog
*/
public class MongoTemplate implements MongoOperations, ApplicationContextAware, IndexOperationsProvider {
public class MongoTemplate
implements MongoOperations, ApplicationContextAware, IndexOperationsProvider, ReadPreferenceAware {
private static final Log LOGGER = LogFactory.getLog(MongoTemplate.class);
private static final WriteResultChecking DEFAULT_WRITE_RESULT_CHECKING = WriteResultChecking.NONE;
@@ -293,6 +311,16 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
this.readPreference = readPreference;
}
@Override
public boolean hasReadPreference() {
return this.readPreference != null;
}
@Override
public ReadPreference getReadPreference() {
return this.readPreference;
}
/**
* Configure whether lifecycle events such as {@link AfterLoadEvent}, {@link BeforeSaveEvent}, etc. should be
* published or whether emission should be suppressed. Enabled by default.
@@ -363,10 +391,10 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
if (enabled) {
this.countExecution = (collectionName, filter, options) -> {
this.countExecution = (collectionPreparer, collectionName, filter, options) -> {
if (!estimationFilter.test(filter, options)) {
return doExactCount(collectionName, filter, options);
return doExactCount(collectionPreparer, collectionName, filter, options);
}
EstimatedDocumentCountOptions estimatedDocumentCountOptions = new EstimatedDocumentCountOptions();
@@ -374,7 +402,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
estimatedDocumentCountOptions.maxTime(options.getMaxTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
}
return doEstimatedCount(collectionName, estimatedDocumentCountOptions);
return doEstimatedCount(collectionPreparer, collectionName, estimatedDocumentCountOptions);
};
} else {
this.countExecution = this::doExactCount;
@@ -443,8 +471,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
Document mappedQuery = queryContext.getMappedQuery(persistentEntity);
Document mappedFields = queryContext.getMappedFields(persistentEntity, projection);
CollectionPreparerDelegate readPreference = createDelegate(query);
FindIterable<Document> cursor = new QueryCursorPreparer(query, entityType).initiateFind(collection,
col -> col.find(mappedQuery, Document.class).projection(mappedFields));
col -> readPreference.prepare(col).find(mappedQuery, Document.class).projection(mappedFields));
return new CloseableIterableCursorAdapter<>(cursor, exceptionTranslator,
new ProjectingReadCallback<>(mongoConverter, projection, collectionName)).stream();
@@ -517,7 +546,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
serializeToJsonSafely(queryObject), sortObject, fieldsObject, collectionName));
}
this.executeQueryInternal(new FindCallback(queryObject, fieldsObject, null),
this.executeQueryInternal(new FindCallback(createDelegate(query), queryObject, fieldsObject, null),
preparer != null ? preparer : CursorPreparer.NO_OP_PREPARER, documentCallbackHandler, collectionName);
}
@@ -619,7 +648,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
@Nullable ViewOptions options) {
return createView(name, getCollectionName(source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
options);
}
@@ -628,7 +657,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
@Nullable ViewOptions options) {
return createView(name, source,
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
options);
}
@@ -765,7 +794,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
if (ObjectUtils.isEmpty(query.getSortObject())) {
return doFindOne(collectionName, query.getQueryObject(), query.getFieldsObject(),
return doFindOne(collectionName, createDelegate(query), query.getQueryObject(), query.getFieldsObject(),
new QueryCursorPreparer(query, entityClass), entityClass);
} else {
query.limit(1);
@@ -797,7 +826,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
Document mappedQuery = queryContext.getMappedQuery(entityClass, this::getPersistentEntity);
return execute(collectionName,
new ExistsCallback(mappedQuery, queryContext.getCollation(entityClass).orElse(null)));
new ExistsCallback(createDelegate(query), mappedQuery, queryContext.getCollation(entityClass).orElse(null)));
}
// Find methods that take a Query to express the query and that return a List of objects.
@@ -814,7 +843,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
Assert.notNull(collectionName, "CollectionName must not be null");
Assert.notNull(entityClass, "EntityClass must not be null");
return doFind(collectionName, query.getQueryObject(), query.getFieldsObject(), entityClass,
return doFind(collectionName, createDelegate(query), query.getQueryObject(), query.getFieldsObject(), entityClass,
new QueryCursorPreparer(query, entityClass));
}
@@ -834,7 +863,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
String idKey = operations.getIdPropertyName(entityClass);
return doFindOne(collectionName, new Document(idKey, id), new Document(), entityClass);
return doFindOne(collectionName, CollectionPreparer.identity(), new Document(idKey, id), new Document(),
entityClass);
}
@Override
@@ -867,10 +897,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
serializeToJsonSafely(mappedQuery), field, collectionName));
}
QueryCursorPreparer preparer = new QueryCursorPreparer(query, entityClass);
if (preparer.hasReadPreference()) {
collection = collection.withReadPreference(preparer.getReadPreference());
}
collection = createDelegate(query).prepare(collection);
DistinctIterable<T> iterable = collection.distinct(mappedFieldName, mappedQuery, mongoDriverCompatibleType);
distinctQueryContext.applyCollation(entityClass, iterable::collation);
@@ -920,8 +947,18 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
String collection = StringUtils.hasText(collectionName) ? collectionName : getCollectionName(domainType);
String distanceField = operations.nearQueryDistanceFieldName(domainType);
Builder optionsBuilder = AggregationOptions.builder().collation(near.getCollation());
if (near.hasReadPreference()) {
optionsBuilder.readPreference(near.getReadPreference());
}
if(near.hasReadConcern()) {
optionsBuilder.readConcern(near.getReadConcern());
}
Aggregation $geoNear = TypedAggregation.newAggregation(domainType, Aggregation.geoNear(near, distanceField))
.withOptions(AggregationOptions.builder().collation(near.getCollation()).build());
.withOptions(optionsBuilder.build());
AggregationResults<Document> results = aggregate($geoNear, collection, Document.class);
EntityProjection<T, ?> projection = operations.introspectProjection(returnType, domainType);
@@ -986,7 +1023,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
operations.forType(entityClass).getCollation(query).ifPresent(optionsToUse::collation);
}
return doFindAndModify(collectionName, query.getQueryObject(), query.getFieldsObject(),
return doFindAndModify(createDelegate(query), collectionName, query.getQueryObject(), query.getFieldsObject(),
getMappedSortObject(query, entityClass), entityClass, update, optionsToUse);
}
@@ -1008,6 +1045,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
QueryContext queryContext = queryOperations.createQueryContext(query);
EntityProjection<T, S> projection = operations.introspectProjection(resultType, entityType);
CollectionPreparerDelegate collectionPreparer = createDelegate(query);
Document mappedQuery = queryContext.getMappedQuery(entity);
Document mappedFields = queryContext.getMappedFields(entity, projection);
Document mappedSort = queryContext.getMappedSort(entity);
@@ -1018,7 +1056,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
maybeEmitEvent(new BeforeSaveEvent<>(replacement, mappedReplacement, collectionName));
maybeCallBeforeSave(replacement, mappedReplacement, collectionName);
T saved = doFindAndReplace(collectionName, mappedQuery, mappedFields, mappedSort,
T saved = doFindAndReplace(collectionPreparer, collectionName, mappedQuery, mappedFields, mappedSort,
queryContext.getCollation(entityType).orElse(null), entityType, mappedReplacement, options, projection);
if (saved != null) {
@@ -1046,7 +1084,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
Assert.notNull(entityClass, "EntityClass must not be null");
Assert.notNull(collectionName, "CollectionName must not be null");
return doFindAndRemove(collectionName, query.getQueryObject(), query.getFieldsObject(),
return doFindAndRemove(createDelegate(query), collectionName, query.getQueryObject(), query.getFieldsObject(),
getMappedSortObject(query, entityClass), operations.forType(entityClass).getCollation(query).orElse(null),
entityClass);
}
@@ -1078,17 +1116,19 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
CountOptions options = countContext.getCountOptions(entityClass);
Document mappedQuery = countContext.getMappedQuery(entityClass, mappingContext::getPersistentEntity);
return doCount(collectionName, mappedQuery, options);
CollectionPreparerDelegate readPreference = createDelegate(query);
return doCount(readPreference, collectionName, mappedQuery, options);
}
protected long doCount(String collectionName, Document filter, CountOptions options) {
protected long doCount(CollectionPreparer collectionPreparer, String collectionName, Document filter,
CountOptions options) {
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(String.format("Executing count: %s in collection: %s", serializeToJsonSafely(filter), collectionName));
}
return countExecution.countDocuments(collectionName, filter, options);
return countExecution.countDocuments(collectionPreparer, collectionName, filter, options);
}
/*
@@ -1097,11 +1137,13 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
*/
@Override
public long estimatedCount(String collectionName) {
return doEstimatedCount(collectionName, new EstimatedDocumentCountOptions());
return doEstimatedCount(CollectionPreparerDelegate.of(this), collectionName, new EstimatedDocumentCountOptions());
}
protected long doEstimatedCount(String collectionName, EstimatedDocumentCountOptions options) {
return execute(collectionName, collection -> collection.estimatedDocumentCount(options));
protected long doEstimatedCount(CollectionPreparer<MongoCollection<Document>> collectionPreparer,
String collectionName, EstimatedDocumentCountOptions options) {
return execute(collectionName,
collection -> collectionPreparer.prepare(collection).estimatedDocumentCount(options));
}
@Override
@@ -1112,12 +1154,13 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
CountOptions options = countContext.getCountOptions(entityClass);
Document mappedQuery = countContext.getMappedQuery(entityClass, mappingContext::getPersistentEntity);
return doExactCount(collectionName, mappedQuery, options);
return doExactCount(createDelegate(query), collectionName, mappedQuery, options);
}
protected long doExactCount(String collectionName, Document filter, CountOptions options) {
return execute(collectionName,
collection -> collection.countDocuments(CountQuery.of(filter).toQueryDocument(), options));
protected long doExactCount(CollectionPreparer<MongoCollection<Document>> collectionPreparer, String collectionName,
Document filter, CountOptions options) {
return execute(collectionName, collection -> collectionPreparer.prepare(collection)
.countDocuments(CountQuery.of(filter).toQueryDocument(), options));
}
protected boolean countCanBeEstimated(Document filter, CountOptions options) {
@@ -1177,8 +1220,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
*/
protected MongoCollection<Document> prepareCollection(MongoCollection<Document> collection) {
if (this.readPreference != null) {
collection = collection.withReadPreference(readPreference);
if (this.readPreference != null && this.readPreference != collection.getReadPreference()) {
return collection.withReadPreference(readPreference);
}
return collection;
@@ -1754,7 +1797,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
@Override
public <T> List<T> findAll(Class<T> entityClass, String collectionName) {
return executeFindMultiInternal(
new FindCallback(new Document(), new Document(),
new FindCallback(CollectionPreparer.identity(), new Document(), new Document(),
operations.forType(entityClass).getCollation().map(Collation::toMongoCollation).orElse(null)),
CursorPreparer.NO_OP_PREPARER, new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName),
collectionName);
@@ -1812,7 +1855,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
String mapFunc = replaceWithResourceIfNecessary(mapFunction);
String reduceFunc = replaceWithResourceIfNecessary(reduceFunction);
MongoCollection<Document> inputCollection = getAndPrepareCollection(doGetDatabase(), inputCollectionName);
CollectionPreparerDelegate readPreference = createDelegate(query);
MongoCollection<Document> inputCollection = readPreference
.prepare(getAndPrepareCollection(doGetDatabase(), inputCollectionName));
// MapReduceOp
MapReduceIterable<Document> mapReduce = inputCollection.mapReduce(mapFunc, reduceFunc, Document.class);
@@ -1977,6 +2022,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
if (!CollectionUtils.isEmpty(result)) {
Query byIdInQuery = operations.getByIdInQuery(result);
if (query.hasReadPreference()) {
byIdInQuery.withReadPreference(query.getReadPreference());
}
remove(byIdInQuery, entityClass, collectionName);
}
@@ -2032,7 +2080,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
return execute(collectionName, collection -> {
List<Document> rawResult = new ArrayList<>();
CollectionPreparerDelegate delegate = CollectionPreparerDelegate.of(options);
Class<?> domainType = aggregation instanceof TypedAggregation ? ((TypedAggregation<?>) aggregation).getInputType()
: null;
@@ -2040,7 +2088,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
() -> operations.forType(domainType) //
.getCollation());
AggregateIterable<Document> aggregateIterable = collection.aggregate(pipeline, Document.class) //
AggregateIterable<Document> aggregateIterable = delegate.prepare(collection).aggregate(pipeline, Document.class) //
.collation(collation.map(Collation::toMongoCollation).orElse(null)) //
.allowDiskUse(options.isAllowDiskUse());
@@ -2103,7 +2151,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
return execute(collectionName, (CollectionCallback<Stream<O>>) collection -> {
AggregateIterable<Document> cursor = collection.aggregate(pipeline, Document.class) //
CollectionPreparerDelegate delegate = CollectionPreparerDelegate.of(options);
AggregateIterable<Document> cursor = delegate.prepare(collection).aggregate(pipeline, Document.class) //
.allowDiskUse(options.isAllowDiskUse());
if (options.getCursorBatchSize() != null) {
@@ -2344,14 +2394,16 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
* The query document is specified as a standard {@link Document} and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record.
* @param fields the document that specifies the fields to be returned.
* @param entityClass the parameterized type of the returned list.
* @return the converted object or {@literal null} if none exists.
*/
@Nullable
protected <T> T doFindOne(String collectionName, Document query, Document fields, Class<T> entityClass) {
return doFindOne(collectionName, query, fields, CursorPreparer.NO_OP_PREPARER, entityClass);
protected <T> T doFindOne(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
Document query, Document fields, Class<T> entityClass) {
return doFindOne(collectionName, collectionPreparer, query, fields, CursorPreparer.NO_OP_PREPARER, entityClass);
}
/**
@@ -2359,17 +2411,18 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
* The query document is specified as a standard {@link Document} and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record.
* @param fields the document that specifies the fields to be returned.
* @param entityClass the parameterized type of the returned list.
* @param preparer the preparer used to modify the cursor on execution.
* @param entityClass the parameterized type of the returned list.
* @return the converted object or {@literal null} if none exists.
* @since 2.2
*/
@Nullable
@SuppressWarnings("ConstantConditions")
protected <T> T doFindOne(String collectionName, Document query, Document fields, CursorPreparer preparer,
Class<T> entityClass) {
protected <T> T doFindOne(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
Document query, Document fields, CursorPreparer preparer, Class<T> entityClass) {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
@@ -2382,7 +2435,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
serializeToJsonSafely(query), mappedFields, entityClass, collectionName));
}
return executeFindOneInternal(new FindOneCallback(mappedQuery, mappedFields, preparer),
return executeFindOneInternal(new FindOneCallback(collectionPreparer, mappedQuery, mappedFields, preparer),
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
}
@@ -2391,13 +2444,15 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
* query document is specified as a standard Document and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record
* @param fields the document that specifies the fields to be returned
* @param entityClass the parameterized type of the returned list.
* @return the List of converted objects.
*/
protected <T> List<T> doFind(String collectionName, Document query, Document fields, Class<T> entityClass) {
return doFind(collectionName, query, fields, entityClass, null,
protected <T> List<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
Document query, Document fields, Class<T> entityClass) {
return doFind(collectionName, collectionPreparer, query, fields, entityClass, null,
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName));
}
@@ -2407,6 +2462,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
* specified as a standard Document and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record.
* @param fields the document that specifies the fields to be returned.
* @param entityClass the parameterized type of the returned list.
@@ -2414,14 +2470,15 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
* (apply limits, skips and so on).
* @return the {@link List} of converted objects.
*/
protected <T> List<T> doFind(String collectionName, Document query, Document fields, Class<T> entityClass,
CursorPreparer preparer) {
return doFind(collectionName, query, fields, entityClass, preparer,
protected <T> List<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
Document query, Document fields, Class<T> entityClass, CursorPreparer preparer) {
return doFind(collectionName, collectionPreparer, query, fields, entityClass, preparer,
new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName));
}
protected <S, T> List<T> doFind(String collectionName, Document query, Document fields, Class<S> entityClass,
@Nullable CursorPreparer preparer, DocumentCallback<T> objectCallback) {
protected <S, T> List<T> doFind(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
Class<S> entityClass, @Nullable CursorPreparer preparer, DocumentCallback<T> objectCallback) {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
@@ -2434,7 +2491,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
serializeToJsonSafely(mappedQuery), mappedFields, entityClass, collectionName));
}
return executeFindMultiInternal(new FindCallback(mappedQuery, mappedFields, null),
return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields, null),
preparer != null ? preparer : CursorPreparer.NO_OP_PREPARER, objectCallback, collectionName);
}
@@ -2444,8 +2501,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
*
* @since 2.0
*/
<S, T> List<T> doFind(String collectionName, Document query, Document fields, Class<S> sourceClass,
Class<T> targetClass, CursorPreparer preparer) {
<S, T> List<T> doFind(CollectionPreparer<MongoCollection<Document>> collectionPreparer, String collectionName,
Document query, Document fields, Class<S> sourceClass, Class<T> targetClass, CursorPreparer preparer) {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(sourceClass);
EntityProjection<T, S> projection = operations.introspectProjection(targetClass, sourceClass);
@@ -2459,7 +2516,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
serializeToJsonSafely(mappedQuery), mappedFields, sourceClass, collectionName));
}
return executeFindMultiInternal(new FindCallback(mappedQuery, mappedFields, null), preparer,
return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields, null), preparer,
new ProjectingReadCallback<>(mongoConverter, projection, collectionName), collectionName);
}
@@ -2533,8 +2590,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
* @return the List of converted objects.
*/
@SuppressWarnings("ConstantConditions")
protected <T> T doFindAndRemove(String collectionName, Document query, Document fields, Document sort,
@Nullable Collation collation, Class<T> entityClass) {
protected <T> T doFindAndRemove(CollectionPreparer collectionPreparer, String collectionName, Document query,
Document fields, Document sort, @Nullable Collation collation, Class<T> entityClass) {
EntityReader<? super T, Bson> readerToUse = this.mongoConverter;
@@ -2545,14 +2602,15 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
return executeFindOneInternal(
new FindAndRemoveCallback(queryMapper.getMappedObject(query, entity), fields, sort, collation),
return executeFindOneInternal(new FindAndRemoveCallback(collectionPreparer,
queryMapper.getMappedObject(query, entity), fields, sort, collation),
new ReadDocumentCallback<>(readerToUse, entityClass, collectionName), collectionName);
}
@SuppressWarnings("ConstantConditions")
protected <T> T doFindAndModify(String collectionName, Document query, Document fields, Document sort,
Class<T> entityClass, UpdateDefinition update, @Nullable FindAndModifyOptions options) {
protected <T> T doFindAndModify(CollectionPreparer collectionPreparer, String collectionName, Document query,
Document fields, Document sort, Class<T> entityClass, UpdateDefinition update,
@Nullable FindAndModifyOptions options) {
EntityReader<? super T, Bson> readerToUse = this.mongoConverter;
@@ -2577,7 +2635,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
}
return executeFindOneInternal(
new FindAndModifyCallback(mappedQuery, fields, sort, mappedUpdate,
new FindAndModifyCallback(collectionPreparer, mappedQuery, fields, sort, mappedUpdate,
update.getArrayFilters().stream().map(ArrayFilter::asDocument).collect(Collectors.toList()), options),
new ReadDocumentCallback<>(readerToUse, entityClass, collectionName), collectionName);
}
@@ -2598,14 +2656,18 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
* {@literal false} and {@link FindAndReplaceOptions#isUpsert() upsert} is {@literal false}.
*/
@Nullable
protected <T> T doFindAndReplace(String collectionName, Document mappedQuery, Document mappedFields,
Document mappedSort, @Nullable com.mongodb.client.model.Collation collation, Class<?> entityType,
Document replacement, FindAndReplaceOptions options, Class<T> resultType) {
protected <T> T doFindAndReplace(CollectionPreparer collectionPreparer, String collectionName, Document mappedQuery,
Document mappedFields, Document mappedSort, @Nullable com.mongodb.client.model.Collation collation,
Class<?> entityType, Document replacement, FindAndReplaceOptions options, Class<T> resultType) {
EntityProjection<T, ?> projection = operations.introspectProjection(resultType, entityType);
return doFindAndReplace(collectionName, mappedQuery, mappedFields, mappedSort, collation, entityType, replacement,
options, projection);
return doFindAndReplace(collectionPreparer, collectionName, mappedQuery, mappedFields, mappedSort, collation,
entityType, replacement, options, projection);
}
CollectionPreparerDelegate createDelegate(Query query) {
return CollectionPreparerDelegate.of(query);
}
/**
@@ -2625,9 +2687,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
* @since 3.4
*/
@Nullable
private <T> T doFindAndReplace(String collectionName, Document mappedQuery, Document mappedFields,
Document mappedSort, @Nullable com.mongodb.client.model.Collation collation, Class<?> entityType,
Document replacement, FindAndReplaceOptions options, EntityProjection<T, ?> projection) {
private <T> T doFindAndReplace(CollectionPreparer collectionPreparer, String collectionName, Document mappedQuery,
Document mappedFields, Document mappedSort, @Nullable com.mongodb.client.model.Collation collation,
Class<?> entityType, Document replacement, FindAndReplaceOptions options, EntityProjection<T, ?> projection) {
if (LOGGER.isDebugEnabled()) {
LOGGER
@@ -2638,9 +2700,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
serializeToJsonSafely(mappedSort), entityType, serializeToJsonSafely(replacement), collectionName));
}
return executeFindOneInternal(
new FindAndReplaceCallback(mappedQuery, mappedFields, mappedSort, replacement, collation, options),
new ProjectingReadCallback<>(mongoConverter, projection, collectionName), collectionName);
return executeFindOneInternal(new FindAndReplaceCallback(collectionPreparer, mappedQuery, mappedFields, mappedSort,
replacement, collation, options), new ProjectingReadCallback<>(mongoConverter, projection, collectionName),
collectionName);
}
/**
@@ -2810,12 +2872,15 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
*/
private static class FindOneCallback implements CollectionCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final Optional<Document> fields;
private final CursorPreparer cursorPreparer;
FindOneCallback(Document query, Document fields, CursorPreparer preparer) {
FindOneCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
CursorPreparer preparer) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = Optional.of(fields).filter(it -> !ObjectUtils.isEmpty(fields));
this.cursorPreparer = preparer;
@@ -2824,7 +2889,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
@Override
public Document doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {
FindIterable<Document> iterable = cursorPreparer.initiateFind(collection, col -> col.find(query, Document.class));
FindIterable<Document> iterable = cursorPreparer.initiateFind(collection,
col -> collectionPreparer.prepare(col).find(query, Document.class));
if (LOGGER.isDebugEnabled()) {
@@ -2851,15 +2917,18 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
*/
private static class FindCallback implements CollectionCallback<FindIterable<Document>> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final Document fields;
private final @Nullable com.mongodb.client.model.Collation collation;
public FindCallback(Document query, Document fields, @Nullable com.mongodb.client.model.Collation collation) {
public FindCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
Document fields, @Nullable com.mongodb.client.model.Collation collation) {
Assert.notNull(query, "Query must not be null");
Assert.notNull(fields, "Fields must not be null");
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = fields;
this.collation = collation;
@@ -2869,7 +2938,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
public FindIterable<Document> doInCollection(MongoCollection<Document> collection)
throws MongoException, DataAccessException {
FindIterable<Document> findIterable = collection.find(query, Document.class).projection(fields);
FindIterable<Document> findIterable = collectionPreparer.prepare(collection).find(query, Document.class)
.projection(fields);
if (collation != null) {
findIterable = findIterable.collation(collation);
@@ -2887,11 +2957,14 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
*/
private class ExistsCallback implements CollectionCallback<Boolean> {
private final CollectionPreparer collectionPreparer;
private final Document mappedQuery;
private final com.mongodb.client.model.Collation collation;
ExistsCallback(Document mappedQuery, com.mongodb.client.model.Collation collation) {
ExistsCallback(CollectionPreparer collectionPreparer, Document mappedQuery,
com.mongodb.client.model.Collation collation) {
this.collectionPreparer = collectionPreparer;
this.mappedQuery = mappedQuery;
this.collation = collation;
}
@@ -2899,7 +2972,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
@Override
public Boolean doInCollection(MongoCollection<Document> collection) throws MongoException, DataAccessException {
return doCount(collection.getNamespace().getCollectionName(), mappedQuery,
return doCount(collectionPreparer, collection.getNamespace().getCollectionName(), mappedQuery,
new CountOptions().limit(1).collation(collation)) > 0;
}
}
@@ -2912,12 +2985,15 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
*/
private static class FindAndRemoveCallback implements CollectionCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final Document fields;
private final Document sort;
private final Optional<Collation> collation;
FindAndRemoveCallback(Document query, Document fields, Document sort, @Nullable Collation collation) {
FindAndRemoveCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
Document fields, Document sort, @Nullable Collation collation) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = fields;
@@ -2931,12 +3007,13 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
FindOneAndDeleteOptions opts = new FindOneAndDeleteOptions().sort(sort).projection(fields);
collation.map(Collation::toMongoCollation).ifPresent(opts::collation);
return collection.findOneAndDelete(query, opts);
return collectionPreparer.prepare(collection).findOneAndDelete(query, opts);
}
}
private static class FindAndModifyCallback implements CollectionCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final Document fields;
private final Document sort;
@@ -2944,9 +3021,10 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
private final List<Document> arrayFilters;
private final FindAndModifyOptions options;
FindAndModifyCallback(Document query, Document fields, Document sort, Object update, List<Document> arrayFilters,
FindAndModifyOptions options) {
FindAndModifyCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
Document fields, Document sort, Object update, List<Document> arrayFilters, FindAndModifyOptions options) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = fields;
this.sort = sort;
@@ -2975,9 +3053,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
}
if (update instanceof Document) {
return collection.findOneAndUpdate(query, (Document) update, opts);
return collectionPreparer.prepare(collection).findOneAndUpdate(query, (Document) update, opts);
} else if (update instanceof List) {
return collection.findOneAndUpdate(query, (List<Document>) update, opts);
return collectionPreparer.prepare(collection).findOneAndUpdate(query, (List<Document>) update, opts);
}
throw new IllegalArgumentException(String.format("Using %s is not supported in findOneAndUpdate", update));
@@ -2993,6 +3071,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
*/
private static class FindAndReplaceCallback implements CollectionCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final Document fields;
private final Document sort;
@@ -3000,9 +3079,10 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
private final @Nullable com.mongodb.client.model.Collation collation;
private final FindAndReplaceOptions options;
FindAndReplaceCallback(Document query, Document fields, Document sort, Document update,
@Nullable com.mongodb.client.model.Collation collation, FindAndReplaceOptions options) {
FindAndReplaceCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
Document fields, Document sort, Document update, @Nullable com.mongodb.client.model.Collation collation,
FindAndReplaceOptions options) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = fields;
this.sort = sort;
@@ -3027,7 +3107,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
opts.returnDocument(ReturnDocument.AFTER);
}
return collection.findOneAndReplace(query, update, opts);
return collectionPreparer.prepare(collection).findOneAndReplace(query, update, opts);
}
}
@@ -3209,11 +3289,6 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
return cursorToUse;
}
@Override
public ReadPreference getReadPreference() {
return query.getMeta().getFlags().contains(CursorOption.SECONDARY_READS) ? ReadPreference.primaryPreferred()
: null;
}
}
/**
@@ -3399,6 +3474,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
@FunctionalInterface
interface CountExecution {
long countDocuments(String collection, Document filter, CountOptions options);
long countDocuments(CollectionPreparer collectionPreparer, String collection, Document filter,
CountOptions options);
}
}

View File

@@ -20,6 +20,7 @@ import reactor.core.publisher.Mono;
import org.bson.Document;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.data.mongodb.core.CollectionPreparerSupport.ReactiveCollectionPreparerDelegate;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.SerializationUtils;
@@ -67,8 +68,8 @@ class ReactiveFindOperationSupport implements ReactiveFindOperation {
private final String collection;
private final Query query;
ReactiveFindSupport(ReactiveMongoTemplate template, Class<?> domainType, Class<T> returnType,
String collection, Query query) {
ReactiveFindSupport(ReactiveMongoTemplate template, Class<?> domainType, Class<T> returnType, String collection,
Query query) {
this.template = template;
this.domainType = domainType;
@@ -169,8 +170,8 @@ class ReactiveFindOperationSupport implements ReactiveFindOperation {
Document queryObject = query.getQueryObject();
Document fieldsObject = query.getFieldsObject();
return template.doFind(getCollectionName(), queryObject, fieldsObject, domainType, returnType,
preparer != null ? preparer : getCursorPreparer(query));
return template.doFind(getCollectionName(), ReactiveCollectionPreparerDelegate.of(query), queryObject,
fieldsObject, domainType, returnType, preparer != null ? preparer : getCursorPreparer(query));
}
@SuppressWarnings("unchecked")

View File

@@ -25,13 +25,13 @@ import java.util.function.Supplier;
import org.bson.Document;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.data.geo.GeoResult;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
@@ -256,6 +256,19 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations {
return createView(name, source, AggregationPipeline.of(stages));
}
/**
* Create a view with the provided name. The view content is defined by the {@link AggregationStage pipeline stages}
* on another collection or view identified by the given {@link #getCollectionName(Class) source type}.
*
* @param name the name of the view to create.
* @param source the type defining the views source collection.
* @param stages the {@link AggregationOperation aggregation pipeline stages} defining the view content.
* @since 4.1
*/
default Mono<MongoCollection<Document>> createView(String name, Class<?> source, AggregationStage... stages) {
return createView(name, source, AggregationPipeline.of(stages));
}
/**
* Create a view with the provided name. The view content is defined by the {@link AggregationPipeline pipeline} on
* another collection or view identified by the given {@link #getCollectionName(Class) source type}.

View File

@@ -70,6 +70,7 @@ import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.ReactiveMongoDatabaseUtils;
import org.springframework.data.mongodb.SessionSynchronization;
import org.springframework.data.mongodb.core.CollectionPreparerSupport.ReactiveCollectionPreparerDelegate;
import org.springframework.data.mongodb.core.EntityOperations.AdaptibleEntity;
import org.springframework.data.mongodb.core.QueryOperations.AggregationDefinition;
import org.springframework.data.mongodb.core.QueryOperations.CountContext;
@@ -80,6 +81,7 @@ import org.springframework.data.mongodb.core.QueryOperations.UpdateContext;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions.Builder;
import org.springframework.data.mongodb.core.aggregation.AggregationPipeline;
import org.springframework.data.mongodb.core.aggregation.PrefixingDelegatingAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.RelaxedTypeBasedAggregationOperationContext;
@@ -105,7 +107,6 @@ import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
import org.springframework.data.mongodb.core.query.BasicQuery;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Meta;
import org.springframework.data.mongodb.core.query.Meta.CursorOption;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
@@ -147,9 +148,18 @@ import com.mongodb.reactivestreams.client.MongoDatabase;
* extract results. This class executes BSON queries or updates, initiating iteration over {@link FindPublisher} and
* catching MongoDB exceptions and translating them to the generic, more informative exception hierarchy defined in the
* org.springframework.dao package. Can be used within a service implementation via direct instantiation with a
* {@link SimpleReactiveMongoDatabaseFactory} reference, or get prepared in an application context and given to services
* as bean reference. Note: The {@link SimpleReactiveMongoDatabaseFactory} should always be configured as a bean in the
* application context, in the first case given to the service directly, in the second case to the prepared template.
* {@link ReactiveMongoDatabaseFactory} reference, or get prepared in an application context and given to services as
* bean reference.
* <p>
* Note: The {@link ReactiveMongoDatabaseFactory} should always be configured as a bean in the application context, in
* the first case given to the service directly, in the second case to the prepared template.
* <h3>{@link ReadPreference} and {@link com.mongodb.ReadConcern}</h3>
* <p>
* {@code ReadPreference} and {@code ReadConcern} are generally considered from {@link Query} and
* {@link AggregationOptions} objects for the action to be executed on a particular {@link MongoCollection}.
* <p>
* You can also set the default {@link #setReadPreference(ReadPreference) ReadPreference} on the template level to
* generally apply a {@link ReadPreference}.
*
* @author Mark Paluch
* @author Christoph Strobl
@@ -666,7 +676,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
@Nullable ViewOptions options) {
return createView(name, getCollectionName(source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getStages()), source),
options);
}
@@ -675,7 +685,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
@Nullable ViewOptions options) {
return createView(name, source,
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getOperations()), (Class<?>) null),
queryOperations.createAggregation(Aggregation.newAggregation(pipeline.getStages()), (Class<?>) null),
options);
}
@@ -756,8 +766,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
public <T> Mono<T> findOne(Query query, Class<T> entityClass, String collectionName) {
if (ObjectUtils.isEmpty(query.getSortObject())) {
return doFindOne(collectionName, query.getQueryObject(), query.getFieldsObject(), entityClass,
new QueryFindPublisherPreparer(query, entityClass));
return doFindOne(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
query.getFieldsObject(), entityClass, new QueryFindPublisherPreparer(query, entityClass));
}
query.limit(1);
@@ -783,10 +793,11 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
return createFlux(collectionName, collection -> {
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
QueryContext queryContext = queryOperations.createQueryContext(query);
Document filter = queryContext.getMappedQuery(entityClass, this::getPersistentEntity);
FindPublisher<Document> findPublisher = collection.find(filter, Document.class)
FindPublisher<Document> findPublisher = collectionPreparer.prepare(collection).find(filter, Document.class)
.projection(new Document("_id", 1));
if (LOGGER.isDebugEnabled()) {
@@ -811,8 +822,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
return findAll(entityClass, collectionName);
}
return doFind(collectionName, query.getQueryObject(), query.getFieldsObject(), entityClass,
new QueryFindPublisherPreparer(query, entityClass));
return doFind(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
query.getFieldsObject(), entityClass, new QueryFindPublisherPreparer(query, entityClass));
}
@Override
@@ -825,7 +836,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
String idKey = operations.getIdPropertyName(entityClass);
return doFindOne(collectionName, new Document(idKey, id), null, entityClass, (Collation) null);
return doFindOne(collectionName, CollectionPreparer.identity(), new Document(idKey, id), null, entityClass,
(Collation) null);
}
@Override
@@ -850,6 +862,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
Document mappedQuery = distinctQueryContext.getMappedQuery(entity);
String mappedFieldName = distinctQueryContext.getMappedFieldName(entity);
Class<T> mongoDriverCompatibleType = distinctQueryContext.getDriverCompatibleClass(resultClass);
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
Flux<?> result = execute(collectionName, collection -> {
@@ -859,11 +872,9 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
}
FindPublisherPreparer preparer = new QueryFindPublisherPreparer(query, entityClass);
if (preparer.hasReadPreference()) {
collection = collection.withReadPreference(preparer.getReadPreference());
}
DistinctPublisher<T> publisher = collection.distinct(mappedFieldName, mappedQuery, mongoDriverCompatibleType);
DistinctPublisher<T> publisher = collectionPreparer.prepare(collection).distinct(mappedFieldName, mappedQuery,
mongoDriverCompatibleType);
distinctQueryContext.applyCollation(entityClass, publisher::collation);
return publisher;
});
@@ -929,7 +940,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
boolean isOutOrMerge, AggregationOptions options, ReadDocumentCallback<O> readCallback,
@Nullable Class<?> inputType) {
AggregatePublisher<Document> cursor = collection.aggregate(pipeline, Document.class)
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(options);
AggregatePublisher<Document> cursor = collectionPreparer.prepare(collection).aggregate(pipeline, Document.class)
.allowDiskUse(options.isAllowDiskUse());
if (options.getCursorBatchSize() != null) {
@@ -987,8 +999,19 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
GeoNearResultDocumentCallback<T> callback = new GeoNearResultDocumentCallback<>(distanceField,
new ProjectingReadCallback<>(mongoConverter, projection, collection), near.getMetric());
Builder optionsBuilder = AggregationOptions.builder();
if (near.hasReadPreference()) {
optionsBuilder.readPreference(near.getReadPreference());
}
if(near.hasReadConcern()) {
optionsBuilder.readConcern(near.getReadConcern());
}
optionsBuilder.collation(near.getCollation());
Aggregation $geoNear = TypedAggregation.newAggregation(entityClass, Aggregation.geoNear(near, distanceField))
.withOptions(AggregationOptions.builder().collation(near.getCollation()).build());
.withOptions(optionsBuilder.build());
return aggregate($geoNear, collection, Document.class) //
.concatMap(callback::doWith);
@@ -1028,8 +1051,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
operations.forType(entityClass).getCollation(query).ifPresent(optionsToUse::collation);
}
return doFindAndModify(collectionName, query.getQueryObject(), query.getFieldsObject(),
getMappedSortObject(query, entityClass), entityClass, update, optionsToUse);
return doFindAndModify(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
query.getFieldsObject(), getMappedSortObject(query, entityClass), entityClass, update, optionsToUse);
}
@Override
@@ -1053,6 +1076,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
Document mappedQuery = queryContext.getMappedQuery(entity);
Document mappedFields = queryContext.getMappedFields(entity, projection);
Document mappedSort = queryContext.getMappedSort(entity);
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
return Mono.defer(() -> {
@@ -1070,8 +1094,9 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
mapped.getCollection()));
}).flatMap(it -> {
Mono<T> afterFindAndReplace = doFindAndReplace(it.getCollection(), mappedQuery, mappedFields, mappedSort,
queryContext.getCollation(entityType).orElse(null), entityType, it.getTarget(), options, projection);
Mono<T> afterFindAndReplace = doFindAndReplace(it.getCollection(), collectionPreparer, mappedQuery,
mappedFields, mappedSort, queryContext.getCollation(entityType).orElse(null), entityType, it.getTarget(),
options, projection);
return afterFindAndReplace.flatMap(saved -> {
maybeEmitEvent(new AfterSaveEvent<>(saved, it.getTarget(), it.getCollection()));
return maybeCallAfterSave(saved, it.getTarget(), it.getCollection());
@@ -1089,9 +1114,9 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
public <T> Mono<T> findAndRemove(Query query, Class<T> entityClass, String collectionName) {
operations.forType(entityClass).getCollation(query);
return doFindAndRemove(collectionName, query.getQueryObject(), query.getFieldsObject(),
getMappedSortObject(query, entityClass), operations.forType(entityClass).getCollation(query).orElse(null),
entityClass);
return doFindAndRemove(collectionName, ReactiveCollectionPreparerDelegate.of(query), query.getQueryObject(),
query.getFieldsObject(), getMappedSortObject(query, entityClass),
operations.forType(entityClass).getCollation(query).orElse(null), entityClass);
}
/*
@@ -1799,12 +1824,14 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
MongoAction mongoAction = new MongoAction(writeConcern, MongoActionOperation.REMOVE, collectionName, entityClass,
null, removeQuery);
WriteConcern writeConcernToUse = prepareWriteConcern(mongoAction);
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
return execute(collectionName, collection -> {
maybeEmitEvent(new BeforeDeleteEvent<>(removeQuery, entityClass, collectionName));
MongoCollection<Document> collectionToUse = prepareCollection(collection, writeConcernToUse);
MongoCollection<Document> collectionToUse = collectionPreparer
.prepare(prepareCollection(collection, writeConcernToUse));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Remove using query: %s in collection: %s.", serializeToJsonSafely(removeQuery),
@@ -1839,8 +1866,9 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
@Override
public <T> Flux<T> findAll(Class<T> entityClass, String collectionName) {
return executeFindMultiInternal(new FindCallback(null), FindPublisherPreparer.NO_OP_PREPARER,
new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName), collectionName);
return executeFindMultiInternal(new FindCallback(CollectionPreparer.identity(), null),
FindPublisherPreparer.NO_OP_PREPARER, new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName),
collectionName);
}
@Override
@@ -1867,17 +1895,19 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
@Override
public <T> Flux<T> tail(@Nullable Query query, Class<T> entityClass, String collectionName) {
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(query);
if (query == null) {
LOGGER.debug(String.format("Tail for class: %s in collection: %s", entityClass, collectionName));
return executeFindMultiInternal(
collection -> new FindCallback(null).doInCollection(collection).cursorType(CursorType.TailableAwait),
collection -> new FindCallback(collectionPreparer, null).doInCollection(collection)
.cursorType(CursorType.TailableAwait),
FindPublisherPreparer.NO_OP_PREPARER, new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName),
collectionName);
}
return doFind(collectionName, query.getQueryObject(), query.getFieldsObject(), entityClass,
return doFind(collectionName, collectionPreparer, query.getQueryObject(), query.getFieldsObject(), entityClass,
new TailingQueryFindPublisherPreparer(query, entityClass));
}
@@ -1961,12 +1991,14 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
assertLocalFunctionNames(mapFunction, reduceFunction);
ReactiveCollectionPreparerDelegate collectionPreparer = ReactiveCollectionPreparerDelegate.of(filterQuery);
return createFlux(inputCollectionName, collection -> {
Document mappedQuery = queryMapper.getMappedObject(filterQuery.getQueryObject(),
mappingContext.getPersistentEntity(domainType));
MapReducePublisher<Document> publisher = collection.mapReduce(mapFunction, reduceFunction, Document.class);
MapReducePublisher<Document> publisher = collectionPreparer.prepare(collection).mapReduce(mapFunction,
reduceFunction, Document.class);
publisher.filter(mappedQuery);
@@ -2133,16 +2165,18 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* The query document is specified as a standard {@link Document} and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record.
* @param fields the document that specifies the fields to be returned.
* @param entityClass the parameterized type of the returned list.
* @param collation can be {@literal null}.
* @return the {@link List} of converted objects.
*/
protected <T> Mono<T> doFindOne(String collectionName, Document query, @Nullable Document fields,
protected <T> Mono<T> doFindOne(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, @Nullable Document fields,
Class<T> entityClass, @Nullable Collation collation) {
return doFindOne(collectionName, query, fields, entityClass,
return doFindOne(collectionName, collectionPreparer, query, fields, entityClass,
findPublisher -> collation != null ? findPublisher.collation(collation.toMongoCollation()) : findPublisher);
}
@@ -2151,6 +2185,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* The query document is specified as a standard {@link Document} and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record.
* @param fields the document that specifies the fields to be returned.
* @param entityClass the parameterized type of the returned list.
@@ -2158,7 +2193,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* @return the {@link List} of converted objects.
* @since 2.2
*/
protected <T> Mono<T> doFindOne(String collectionName, Document query, @Nullable Document fields,
protected <T> Mono<T> doFindOne(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, @Nullable Document fields,
Class<T> entityClass, FindPublisherPreparer preparer) {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
@@ -2173,7 +2209,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
serializeToJsonSafely(query), mappedFields, entityClass, collectionName));
}
return executeFindOneInternal(new FindOneCallback(mappedQuery, mappedFields, preparer),
return executeFindOneInternal(new FindOneCallback(collectionPreparer, mappedQuery, mappedFields, preparer),
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
}
@@ -2182,13 +2218,15 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* query document is specified as a standard Document and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record
* @param fields the document that specifies the fields to be returned
* @param entityClass the parameterized type of the returned list.
* @return the List of converted objects.
*/
protected <T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<T> entityClass) {
return doFind(collectionName, query, fields, entityClass, null,
protected <T> Flux<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
Document query, Document fields, Class<T> entityClass) {
return doFind(collectionName, collectionPreparer, query, fields, entityClass, null,
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName));
}
@@ -2198,6 +2236,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* specified as a standard Document and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record.
* @param fields the document that specifies the fields to be returned.
* @param entityClass the parameterized type of the returned list.
@@ -2205,14 +2244,15 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* the result set, (apply limits, skips and so on).
* @return the {@link List} of converted objects.
*/
protected <T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<T> entityClass,
FindPublisherPreparer preparer) {
return doFind(collectionName, query, fields, entityClass, preparer,
protected <T> Flux<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
Document query, Document fields, Class<T> entityClass, FindPublisherPreparer preparer) {
return doFind(collectionName, collectionPreparer, query, fields, entityClass, preparer,
new ReadDocumentCallback<>(mongoConverter, entityClass, collectionName));
}
protected <S, T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<S> entityClass,
@Nullable FindPublisherPreparer preparer, DocumentCallback<T> objectCallback) {
protected <S, T> Flux<T> doFind(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields,
Class<S> entityClass, @Nullable FindPublisherPreparer preparer, DocumentCallback<T> objectCallback) {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
@@ -2225,8 +2265,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
serializeToJsonSafely(mappedQuery), mappedFields, entityClass, collectionName));
}
return executeFindMultiInternal(new FindCallback(mappedQuery, mappedFields), preparer, objectCallback,
collectionName);
return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields), preparer,
objectCallback, collectionName);
}
/**
@@ -2235,8 +2275,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
*
* @since 2.0
*/
<S, T> Flux<T> doFind(String collectionName, Document query, Document fields, Class<S> sourceClass,
Class<T> targetClass, FindPublisherPreparer preparer) {
<S, T> Flux<T> doFind(String collectionName, CollectionPreparer<MongoCollection<Document>> collectionPreparer,
Document query, Document fields, Class<S> sourceClass, Class<T> targetClass, FindPublisherPreparer preparer) {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(sourceClass);
EntityProjection<T, S> projection = operations.introspectProjection(targetClass, sourceClass);
@@ -2250,7 +2290,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
serializeToJsonSafely(mappedQuery), mappedFields, sourceClass, collectionName));
}
return executeFindMultiInternal(new FindCallback(mappedQuery, mappedFields), preparer,
return executeFindMultiInternal(new FindCallback(collectionPreparer, mappedQuery, mappedFields), preparer,
new ProjectingReadCallback<>(mongoConverter, projection, collectionName), collectionName);
}
@@ -2268,13 +2308,15 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* The first document that matches the query is returned and also removed from the collection in the database. <br />
* The query document is specified as a standard Document and so is the fields specification.
*
* @param collectionName name of the collection to retrieve the objects from
* @param query the query document that specifies the criteria used to find a record
* @param collation collation
* @param collectionName name of the collection to retrieve the objects from.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param query the query document that specifies the criteria used to find a record.
* @param collation collation.
* @param entityClass the parameterized type of the returned list.
* @return the List of converted objects.
*/
protected <T> Mono<T> doFindAndRemove(String collectionName, Document query, Document fields, Document sort,
protected <T> Mono<T> doFindAndRemove(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields, Document sort,
@Nullable Collation collation, Class<T> entityClass) {
if (LOGGER.isDebugEnabled()) {
@@ -2284,12 +2326,13 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
return executeFindOneInternal(
new FindAndRemoveCallback(queryMapper.getMappedObject(query, entity), fields, sort, collation),
return executeFindOneInternal(new FindAndRemoveCallback(collectionPreparer,
queryMapper.getMappedObject(query, entity), fields, sort, collation),
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
}
protected <T> Mono<T> doFindAndModify(String collectionName, Document query, Document fields, Document sort,
protected <T> Mono<T> doFindAndModify(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields, Document sort,
Class<T> entityClass, UpdateDefinition update, FindAndModifyOptions options) {
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
@@ -2310,7 +2353,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
}
return executeFindOneInternal(
new FindAndModifyCallback(mappedQuery, fields, sort, mappedUpdate,
new FindAndModifyCallback(collectionPreparer, mappedQuery, fields, sort, mappedUpdate,
update.getArrayFilters().stream().map(ArrayFilter::asDocument).collect(Collectors.toList()), options),
new ReadDocumentCallback<>(this.mongoConverter, entityClass, collectionName), collectionName);
});
@@ -2320,6 +2363,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* Customize this part for findAndReplace.
*
* @param collectionName The name of the collection to perform the operation in.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param mappedQuery the query to look up documents.
* @param mappedFields the fields to project the result to.
* @param mappedSort the sort to be applied when executing the query.
@@ -2332,20 +2376,22 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* {@literal false} and {@link FindAndReplaceOptions#isUpsert() upsert} is {@literal false}.
* @since 2.1
*/
protected <T> Mono<T> doFindAndReplace(String collectionName, Document mappedQuery, Document mappedFields,
protected <T> Mono<T> doFindAndReplace(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document mappedQuery, Document mappedFields,
Document mappedSort, com.mongodb.client.model.Collation collation, Class<?> entityType, Document replacement,
FindAndReplaceOptions options, Class<T> resultType) {
EntityProjection<T, ?> projection = operations.introspectProjection(resultType, entityType);
return doFindAndReplace(collectionName, mappedQuery, mappedFields, mappedSort, collation, entityType, replacement,
options, projection);
return doFindAndReplace(collectionName, collectionPreparer, mappedQuery, mappedFields, mappedSort, collation,
entityType, replacement, options, projection);
}
/**
* Customize this part for findAndReplace.
*
* @param collectionName The name of the collection to perform the operation in.
* @param collectionPreparer the preparer to prepare the collection for the actual use.
* @param mappedQuery the query to look up documents.
* @param mappedFields the fields to project the result to.
* @param mappedSort the sort to be applied when executing the query.
@@ -2358,7 +2404,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* {@literal false} and {@link FindAndReplaceOptions#isUpsert() upsert} is {@literal false}.
* @since 3.4
*/
private <T> Mono<T> doFindAndReplace(String collectionName, Document mappedQuery, Document mappedFields,
private <T> Mono<T> doFindAndReplace(String collectionName,
CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document mappedQuery, Document mappedFields,
Document mappedSort, com.mongodb.client.model.Collation collation, Class<?> entityType, Document replacement,
FindAndReplaceOptions options, EntityProjection<T, ?> projection) {
@@ -2372,8 +2419,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
serializeToJsonSafely(replacement), collectionName));
}
return executeFindOneInternal(
new FindAndReplaceCallback(mappedQuery, mappedFields, mappedSort, replacement, collation, options),
return executeFindOneInternal(new FindAndReplaceCallback(collectionPreparer, mappedQuery, mappedFields,
mappedSort, replacement, collation, options),
new ProjectingReadCallback<>(this.mongoConverter, projection, collectionName), collectionName);
});
@@ -2451,7 +2498,12 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
* @param collection
*/
protected MongoCollection<Document> prepareCollection(MongoCollection<Document> collection) {
return this.readPreference != null ? collection.withReadPreference(readPreference) : collection;
if (this.readPreference != null && this.readPreference != collection.getReadPreference()) {
return collection.withReadPreference(readPreference);
}
return collection;
}
/**
@@ -2621,11 +2673,14 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
*/
private static class FindOneCallback implements ReactiveCollectionCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final Optional<Document> fields;
private final FindPublisherPreparer preparer;
FindOneCallback(Document query, @Nullable Document fields, FindPublisherPreparer preparer) {
FindOneCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
@Nullable Document fields, FindPublisherPreparer preparer) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = Optional.ofNullable(fields);
this.preparer = preparer;
@@ -2642,7 +2697,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
serializeToJsonSafely(fields.orElseGet(Document::new)), collection.getNamespace().getFullName()));
}
FindPublisher<Document> publisher = preparer.initiateFind(collection, col -> col.find(query, Document.class));
FindPublisher<Document> publisher = preparer.initiateFind(collectionPreparer.prepare(collection),
col -> col.find(query, Document.class));
if (fields.isPresent()) {
publisher = publisher.projection(fields.get());
@@ -2660,15 +2716,17 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
*/
private static class FindCallback implements ReactiveCollectionQueryCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final @Nullable Document query;
private final @Nullable Document fields;
FindCallback(@Nullable Document query) {
this(query, null);
FindCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, @Nullable Document query) {
this(collectionPreparer, query, null);
}
FindCallback(Document query, Document fields) {
FindCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query, Document fields) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = fields;
}
@@ -2676,11 +2734,12 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
@Override
public FindPublisher<Document> doInCollection(MongoCollection<Document> collection) {
MongoCollection<Document> collectionToUse = collectionPreparer.prepare(collection);
FindPublisher<Document> findPublisher;
if (ObjectUtils.isEmpty(query)) {
findPublisher = collection.find(Document.class);
findPublisher = collectionToUse.find(Document.class);
} else {
findPublisher = collection.find(query, Document.class);
findPublisher = collectionToUse.find(query, Document.class);
}
if (ObjectUtils.isEmpty(fields)) {
@@ -2699,13 +2758,15 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
*/
private static class FindAndRemoveCallback implements ReactiveCollectionCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final Document fields;
private final Document sort;
private final Optional<Collation> collation;
FindAndRemoveCallback(Document query, Document fields, Document sort, @Nullable Collation collation) {
FindAndRemoveCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
Document fields, Document sort, @Nullable Collation collation) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = fields;
this.sort = sort;
@@ -2719,7 +2780,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
FindOneAndDeleteOptions findOneAndDeleteOptions = convertToFindOneAndDeleteOptions(fields, sort);
collation.map(Collation::toMongoCollation).ifPresent(findOneAndDeleteOptions::collation);
return collection.findOneAndDelete(query, findOneAndDeleteOptions);
return collectionPreparer.prepare(collection).findOneAndDelete(query, findOneAndDeleteOptions);
}
}
@@ -2728,6 +2789,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
*/
private static class FindAndModifyCallback implements ReactiveCollectionCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final Document fields;
private final Document sort;
@@ -2735,9 +2797,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
private final List<Document> arrayFilters;
private final FindAndModifyOptions options;
FindAndModifyCallback(Document query, Document fields, Document sort, Object update, List<Document> arrayFilters,
FindAndModifyOptions options) {
FindAndModifyCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
Document fields, Document sort, Object update, List<Document> arrayFilters, FindAndModifyOptions options) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = fields;
this.sort = sort;
@@ -2750,21 +2813,22 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
public Publisher<Document> doInCollection(MongoCollection<Document> collection)
throws MongoException, DataAccessException {
MongoCollection<Document> collectionToUse = collectionPreparer.prepare(collection);
if (options.isRemove()) {
FindOneAndDeleteOptions findOneAndDeleteOptions = convertToFindOneAndDeleteOptions(fields, sort);
findOneAndDeleteOptions = options.getCollation().map(Collation::toMongoCollation)
.map(findOneAndDeleteOptions::collation).orElse(findOneAndDeleteOptions);
return collection.findOneAndDelete(query, findOneAndDeleteOptions);
return collectionToUse.findOneAndDelete(query, findOneAndDeleteOptions);
}
FindOneAndUpdateOptions findOneAndUpdateOptions = convertToFindOneAndUpdateOptions(options, fields, sort,
arrayFilters);
if (update instanceof Document) {
return collection.findOneAndUpdate(query, (Document) update, findOneAndUpdateOptions);
return collectionToUse.findOneAndUpdate(query, (Document) update, findOneAndUpdateOptions);
} else if (update instanceof List) {
return collection.findOneAndUpdate(query, (List<Document>) update, findOneAndUpdateOptions);
return collectionToUse.findOneAndUpdate(query, (List<Document>) update, findOneAndUpdateOptions);
}
return Flux
@@ -2803,6 +2867,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
*/
private static class FindAndReplaceCallback implements ReactiveCollectionCallback<Document> {
private final CollectionPreparer<MongoCollection<Document>> collectionPreparer;
private final Document query;
private final Document fields;
private final Document sort;
@@ -2810,9 +2875,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
private final @Nullable com.mongodb.client.model.Collation collation;
private final FindAndReplaceOptions options;
FindAndReplaceCallback(Document query, Document fields, Document sort, Document update,
com.mongodb.client.model.Collation collation, FindAndReplaceOptions options) {
FindAndReplaceCallback(CollectionPreparer<MongoCollection<Document>> collectionPreparer, Document query,
Document fields, Document sort, Document update, com.mongodb.client.model.Collation collation,
FindAndReplaceOptions options) {
this.collectionPreparer = collectionPreparer;
this.query = query;
this.fields = fields;
this.sort = sort;
@@ -2826,7 +2892,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
throws MongoException, DataAccessException {
FindOneAndReplaceOptions findOneAndReplaceOptions = convertToFindOneAndReplaceOptions(options, fields, sort);
return collection.findOneAndReplace(query, update, findOneAndReplaceOptions);
return collectionPreparer.prepare(collection).findOneAndReplace(query, update, findOneAndReplaceOptions);
}
private FindOneAndReplaceOptions convertToFindOneAndReplaceOptions(FindAndReplaceOptions options, Document fields,
@@ -3092,11 +3158,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
return findPublisherToUse;
}
@Override
public ReadPreference getReadPreference() {
return query.getMeta().getFlags().contains(CursorOption.SECONDARY_READS) ? ReadPreference.primaryPreferred()
: null;
}
}
class TailingQueryFindPublisherPreparer extends QueryFindPublisherPreparer {

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import org.springframework.lang.Nullable;
import com.mongodb.ReadConcern;
/**
* Interface to be implemented by any object that wishes to expose the {@link ReadConcern}.
* <p>
* Typically implemented by cursor or query preparer objects.
*
* @author Mark Paluch
* @since 4.1
* @see org.springframework.data.mongodb.core.query.Query
* @see org.springframework.data.mongodb.core.aggregation.AggregationOptions
*/
public interface ReadConcernAware {
/**
* @return {@literal true} if a {@link ReadConcern} is set.
*/
default boolean hasReadConcern() {
return getReadConcern() != null;
}
/**
* @return the {@link ReadConcern} to apply or {@literal null} if none set.
*/
@Nullable
ReadConcern getReadConcern();
}

View File

@@ -27,6 +27,8 @@ import com.mongodb.ReadPreference;
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.2
* @see org.springframework.data.mongodb.core.query.Query
* @see org.springframework.data.mongodb.core.aggregation.AggregationOptions
*/
public interface ReadPreferenceAware {

View File

@@ -28,6 +28,7 @@ import org.springframework.data.mongodb.core.aggregation.AddFieldsOperation.AddF
import org.springframework.data.mongodb.core.aggregation.CountOperation.CountOperationBuilder;
import org.springframework.data.mongodb.core.aggregation.FacetOperation.FacetOperationBuilder;
import org.springframework.data.mongodb.core.aggregation.GraphLookupOperation.StartWithBuilder;
import org.springframework.data.mongodb.core.aggregation.LookupOperation.LookupOperationBuilder;
import org.springframework.data.mongodb.core.aggregation.MergeOperation.MergeOperationBuilder;
import org.springframework.data.mongodb.core.aggregation.ReplaceRootOperation.ReplaceRootDocumentOperationBuilder;
import org.springframework.data.mongodb.core.aggregation.ReplaceRootOperation.ReplaceRootOperationBuilder;
@@ -50,6 +51,7 @@ import org.springframework.util.Assert;
* @author Nikolay Bogdanov
* @author Gustavo de Geus
* @author Jérôme Guyon
* @author Sangyong Choi
* @since 1.3
*/
public class Aggregation {
@@ -100,12 +102,12 @@ public class Aggregation {
private final AggregationOptions options;
/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
* Creates a new {@link Aggregation} from the given {@link AggregationStage}s.
*
* @param operations must not be {@literal null} or empty.
*/
public static Aggregation newAggregation(List<? extends AggregationOperation> operations) {
return newAggregation(operations.toArray(new AggregationOperation[operations.size()]));
public static Aggregation newAggregation(List<? extends AggregationStage> operations) {
return newAggregation(operations.toArray(AggregationStage[]::new));
}
/**
@@ -117,6 +119,16 @@ public class Aggregation {
return new Aggregation(operations);
}
/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
* @param stages must not be {@literal null} or empty.
* @since 4.1
*/
public static Aggregation newAggregation(AggregationStage... stages) {
return new Aggregation(stages);
}
/**
* Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
*
@@ -128,6 +140,17 @@ public class Aggregation {
return AggregationUpdate.from(Arrays.asList(operations));
}
/**
* Creates a new {@link AggregationUpdate} from the given {@link AggregationOperation}s.
*
* @param operations can be {@literal empty} but must not be {@literal null}.
* @return new instance of {@link AggregationUpdate}.
* @since 4.1
*/
public static AggregationUpdate newUpdate(AggregationStage... operations) {
return AggregationUpdate.updateFrom(Arrays.asList(operations));
}
/**
* Returns a copy of this {@link Aggregation} with the given {@link AggregationOptions} set. Note that options are
* supported in MongoDB version 2.6+.
@@ -139,7 +162,7 @@ public class Aggregation {
public Aggregation withOptions(AggregationOptions options) {
Assert.notNull(options, "AggregationOptions must not be null");
return new Aggregation(this.pipeline.getOperations(), options);
return new Aggregation(this.pipeline.getStages(), options);
}
/**
@@ -148,8 +171,8 @@ public class Aggregation {
* @param type must not be {@literal null}.
* @param operations must not be {@literal null} or empty.
*/
public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationOperation> operations) {
return newAggregation(type, operations.toArray(new AggregationOperation[operations.size()]));
public static <T> TypedAggregation<T> newAggregation(Class<T> type, List<? extends AggregationStage> operations) {
return newAggregation(type, operations.toArray(AggregationStage[]::new));
}
/**
@@ -162,6 +185,17 @@ public class Aggregation {
return new TypedAggregation<T>(type, operations);
}
/**
* Creates a new {@link TypedAggregation} for the given type and {@link AggregationOperation}s.
*
* @param type must not be {@literal null}.
* @param stages must not be {@literal null} or empty.
* @since 4.1
*/
public static <T> TypedAggregation<T> newAggregation(Class<T> type, AggregationStage... stages) {
return new TypedAggregation<>(type, stages);
}
/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
@@ -171,6 +205,15 @@ public class Aggregation {
this(asAggregationList(aggregationOperations));
}
/**
* Creates a new {@link Aggregation} from the given {@link AggregationOperation}s.
*
* @param aggregationOperations must not be {@literal null} or empty.
*/
protected Aggregation(AggregationStage... aggregationOperations) {
this(Arrays.asList(aggregationOperations));
}
/**
* @param aggregationOperations must not be {@literal null} or empty.
* @return
@@ -187,7 +230,7 @@ public class Aggregation {
*
* @param aggregationOperations must not be {@literal null} or empty.
*/
protected Aggregation(List<AggregationOperation> aggregationOperations) {
protected Aggregation(List<? extends AggregationStage> aggregationOperations) {
this(aggregationOperations, DEFAULT_OPTIONS);
}
@@ -197,7 +240,7 @@ public class Aggregation {
* @param aggregationOperations must not be {@literal null}.
* @param options must not be {@literal null} or empty.
*/
protected Aggregation(List<AggregationOperation> aggregationOperations, AggregationOptions options) {
protected Aggregation(List<? extends AggregationStage> aggregationOperations, AggregationOptions options) {
Assert.notNull(aggregationOperations, "AggregationOperations must not be null");
Assert.notNull(options, "AggregationOptions must not be null");
@@ -636,6 +679,17 @@ public class Aggregation {
return facet().and(aggregationOperations);
}
/**
* Creates a new {@link FacetOperationBuilder} given {@link Aggregation}.
*
* @param stages the sub-pipeline, must not be {@literal null}.
* @return new instance of {@link FacetOperation}.
* @since 4.1
*/
public static FacetOperationBuilder facet(AggregationStage... stages) {
return facet().and(stages);
}
/**
* Creates a new {@link LookupOperation}.
*
@@ -664,6 +718,23 @@ public class Aggregation {
return new LookupOperation(from, localField, foreignField, as);
}
/**
* Entrypoint for creating {@link LookupOperation $lookup} using a fluent builder API.
*
* <pre class="code">
* Aggregation.lookup().from("restaurants").localField("restaurant_name").foreignField("name")
* .let(newVariable("orders_drink").forField("drink"))
* .pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
* .as("matches")
* </pre>
*
* @return new instance of {@link LookupOperationBuilder}.
* @since 4.1
*/
public static LookupOperationBuilder lookup() {
return new LookupOperationBuilder();
}
/**
* Creates a new {@link CountOperationBuilder}.
*

View File

@@ -15,10 +15,10 @@
*/
package org.springframework.data.mongodb.core.aggregation;
import java.util.Collections;
import java.util.List;
import org.bson.Document;
import org.springframework.util.CollectionUtils;
/**
* Represents one single operation in an aggregation pipeline.
@@ -29,30 +29,24 @@ import org.bson.Document;
* @author Christoph Strobl
* @since 1.3
*/
public interface AggregationOperation {
public interface AggregationOperation extends MultiOperationAggregationStage {
/**
* Turns the {@link AggregationOperation} into a {@link Document} by using the given
* {@link AggregationOperationContext}.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the Document
* @deprecated since 2.2 in favor of {@link #toPipelineStages(AggregationOperationContext)}.
* @return
*/
@Deprecated
@Override
Document toDocument(AggregationOperationContext context);
/**
* Turns the {@link AggregationOperation} into list of {@link Document stages} by using the given
* {@link AggregationOperationContext}. This allows a single {@link AggregationOptions} to add additional stages for
* eg. {@code $sort} or {@code $limit}.
* More the exception than the default.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the pipeline stages to run through. Never {@literal null}.
* @since 2.2
* @return never {@literal null}.
*/
@Override
default List<Document> toPipelineStages(AggregationOperationContext context) {
return Collections.singletonList(toDocument(context));
return List.of(toDocument(context));
}
/**
@@ -63,6 +57,6 @@ public interface AggregationOperation {
* @since 3.0.2
*/
default String getOperator() {
return toDocument(Aggregation.DEFAULT_CONTEXT).keySet().iterator().next();
return CollectionUtils.lastElement(toPipelineStages(Aggregation.DEFAULT_CONTEXT)).keySet().iterator().next();
}
}

View File

@@ -45,15 +45,19 @@ class AggregationOperationRenderer {
* @param rootContext must not be {@literal null}.
* @return the {@link List} of {@link Document}.
*/
static List<Document> toDocument(List<AggregationOperation> operations, AggregationOperationContext rootContext) {
static List<Document> toDocument(List<? extends AggregationStage> operations, AggregationOperationContext rootContext) {
List<Document> operationDocuments = new ArrayList<Document>(operations.size());
AggregationOperationContext contextToUse = rootContext;
for (AggregationOperation operation : operations) {
for (AggregationStage operation : operations) {
operationDocuments.addAll(operation.toPipelineStages(contextToUse));
if(operation instanceof MultiOperationAggregationStage mops) {
operationDocuments.addAll(mops.toPipelineStages(contextToUse));
} else {
operationDocuments.add(operation.toDocument(contextToUse));
}
if (operation instanceof FieldsExposingAggregationOperation) {

View File

@@ -19,11 +19,16 @@ import java.time.Duration;
import java.util.Optional;
import org.bson.Document;
import org.springframework.data.mongodb.core.ReadConcernAware;
import org.springframework.data.mongodb.core.ReadPreferenceAware;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.util.BsonUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
/**
* Holds a set of configurable aggregation options that can be used within an aggregation pipeline. A list of support
* aggregation options can be found in the MongoDB reference documentation
@@ -39,7 +44,7 @@ import org.springframework.util.Assert;
* @see TypedAggregation#withOptions(AggregationOptions)
* @since 1.6
*/
public class AggregationOptions {
public class AggregationOptions implements ReadConcernAware, ReadPreferenceAware {
private static final String BATCH_SIZE = "batchSize";
private static final String CURSOR = "cursor";
@@ -56,6 +61,10 @@ public class AggregationOptions {
private final Optional<Collation> collation;
private final Optional<String> comment;
private final Optional<Object> hint;
private Optional<ReadConcern> readConcern;
private Optional<ReadPreference> readPreference;
private Duration maxTime = Duration.ZERO;
private ResultOptions resultOptions = ResultOptions.READ;
private DomainTypeMapping domainTypeMapping = DomainTypeMapping.RELAXED;
@@ -123,6 +132,8 @@ public class AggregationOptions {
this.collation = Optional.ofNullable(collation);
this.comment = Optional.ofNullable(comment);
this.hint = Optional.ofNullable(hint);
this.readConcern = Optional.empty();
this.readPreference = Optional.empty();
}
/**
@@ -268,6 +279,26 @@ public class AggregationOptions {
return hint;
}
@Override
public boolean hasReadConcern() {
return readConcern.isPresent();
}
@Override
public ReadConcern getReadConcern() {
return readConcern.orElse(null);
}
@Override
public boolean hasReadPreference() {
return readPreference.isPresent();
}
@Override
public ReadPreference getReadPreference() {
return readPreference.orElse(null);
}
/**
* @return the time limit for processing. {@link Duration#ZERO} is used for the default unbounded behavior.
* @since 3.0
@@ -385,6 +416,8 @@ public class AggregationOptions {
private @Nullable Collation collation;
private @Nullable String comment;
private @Nullable Object hint;
private @Nullable ReadConcern readConcern;
private @Nullable ReadPreference readPreference;
private @Nullable Duration maxTime;
private @Nullable ResultOptions resultOptions;
private @Nullable DomainTypeMapping domainTypeMapping;
@@ -490,6 +523,32 @@ public class AggregationOptions {
return this;
}
/**
* Define a {@link ReadConcern} to apply to the aggregation.
*
* @param readConcern can be {@literal null}.
* @return this.
* @since 4.1
*/
public Builder readConcern(@Nullable ReadConcern readConcern) {
this.readConcern = readConcern;
return this;
}
/**
* Define a {@link ReadPreference} to apply to the aggregation.
*
* @param readPreference can be {@literal null}.
* @return this.
* @since 4.1
*/
public Builder readPreference(@Nullable ReadPreference readPreference) {
this.readPreference = readPreference;
return this;
}
/**
* Set the time limit for processing.
*
@@ -573,6 +632,12 @@ public class AggregationOptions {
if (domainTypeMapping != null) {
options.domainTypeMapping = domainTypeMapping;
}
if (readConcern != null) {
options.readConcern = Optional.of(readConcern);
}
if (readPreference != null) {
options.readPreference = Optional.of(readPreference);
}
return options;
}

View File

@@ -23,9 +23,10 @@ import java.util.function.Predicate;
import org.bson.Document;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* The {@link AggregationPipeline} holds the collection of {@link AggregationOperation aggregation stages}.
* The {@link AggregationPipeline} holds the collection of {@link AggregationStage aggregation stages}.
*
* @author Christoph Strobl
* @author Mark Paluch
@@ -33,12 +34,23 @@ import org.springframework.util.Assert;
*/
public class AggregationPipeline {
private final List<AggregationOperation> pipeline;
private final List<AggregationStage> pipeline;
public static AggregationPipeline of(AggregationOperation... stages) {
return new AggregationPipeline(Arrays.asList(stages));
}
/**
* Create a new {@link AggregationPipeline} out of the given {@link AggregationStage stages}.
*
* @param stages the pipeline stages.
* @return new instance of {@link AggregationPipeline}.
* @since 4.1
*/
public static AggregationPipeline of(AggregationStage... stages) {
return new AggregationPipeline(Arrays.asList(stages));
}
/**
* Create an empty pipeline
*/
@@ -49,12 +61,12 @@ public class AggregationPipeline {
/**
* Create a new pipeline with given {@link AggregationOperation stages}.
*
* @param aggregationOperations must not be {@literal null}.
* @param aggregationStages must not be {@literal null}.
*/
public AggregationPipeline(List<AggregationOperation> aggregationOperations) {
public AggregationPipeline(List<? extends AggregationStage> aggregationStages) {
Assert.notNull(aggregationOperations, "AggregationOperations must not be null");
pipeline = new ArrayList<>(aggregationOperations);
Assert.notNull(aggregationStages, "AggregationStages must not be null");
pipeline = new ArrayList<>(aggregationStages);
}
/**
@@ -64,10 +76,21 @@ public class AggregationPipeline {
* @return this.
*/
public AggregationPipeline add(AggregationOperation aggregationOperation) {
return add((AggregationStage) aggregationOperation);
}
Assert.notNull(aggregationOperation, "AggregationOperation must not be null");
/**
* Append the given {@link AggregationOperation stage} to the pipeline.
*
* @param stage must not be {@literal null}.
* @return this.
* @since 4.1
*/
public AggregationPipeline add(AggregationStage stage) {
pipeline.add(aggregationOperation);
Assert.notNull(stage, "AggregationOperation must not be null");
pipeline.add(stage);
return this;
}
@@ -76,7 +99,17 @@ public class AggregationPipeline {
*
* @return never {@literal null}.
*/
public List<AggregationOperation> getOperations() {
public List<AggregationStage> getOperations() {
return getStages();
}
/**
* Get the list of {@link AggregationOperation aggregation stages}.
*
* @return never {@literal null}.
* @since 4.1
*/
public List<AggregationStage> getStages() {
return Collections.unmodifiableList(pipeline);
}
@@ -95,14 +128,14 @@ public class AggregationPipeline {
return false;
}
AggregationOperation operation = pipeline.get(pipeline.size() - 1);
AggregationStage operation = pipeline.get(pipeline.size() - 1);
return isOut(operation) || isMerge(operation);
}
void verify() {
// check $out/$merge is the last operation if it exists
for (AggregationOperation operation : pipeline) {
for (AggregationStage operation : pipeline) {
if (isOut(operation) && !isLast(operation)) {
throw new IllegalArgumentException("The $out operator must be the last stage in the pipeline");
@@ -134,13 +167,13 @@ public class AggregationPipeline {
return pipeline.isEmpty();
}
private boolean containsOperation(Predicate<AggregationOperation> predicate) {
private boolean containsOperation(Predicate<AggregationStage> predicate) {
if (isEmpty()) {
return false;
}
for (AggregationOperation element : pipeline) {
for (AggregationStage element : pipeline) {
if (predicate.test(element)) {
return true;
}
@@ -149,19 +182,29 @@ public class AggregationPipeline {
return false;
}
private boolean isLast(AggregationOperation aggregationOperation) {
private boolean isLast(AggregationStage aggregationOperation) {
return pipeline.indexOf(aggregationOperation) == pipeline.size() - 1;
}
private static boolean isUnionWith(AggregationOperation operator) {
return operator instanceof UnionWithOperation || operator.getOperator().equals("$unionWith");
private static boolean isUnionWith(AggregationStage stage) {
return isSpecificStage(stage, UnionWithOperation.class, "$unionWith");
}
private static boolean isMerge(AggregationOperation operator) {
return operator instanceof MergeOperation || operator.getOperator().equals("$merge");
private static boolean isMerge(AggregationStage stage) {
return isSpecificStage(stage, MergeOperation.class, "$merge");
}
private static boolean isOut(AggregationOperation operator) {
return operator instanceof OutOperation || operator.getOperator().equals("$out");
private static boolean isOut(AggregationStage stage) {
return isSpecificStage(stage, OutOperation.class, "$out");
}
private static boolean isSpecificStage(AggregationStage stage, Class<?> type, String operator) {
if (ClassUtils.isAssignable(type, stage.getClass())) {
return true;
}
if (stage instanceof AggregationOperation operation) {
return operation.getOperator().equals(operator);
}
return stage.toDocument(Aggregation.DEFAULT_CONTEXT).keySet().iterator().next().equals(operator);
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.aggregation;
import org.bson.Document;
/**
* Abstraction for a single
* <a href="https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/#stages">Aggregation Pipeline
* Stage</a> to be used within an {@link AggregationPipeline}.
* <p>
* An {@link AggregationStage} may operate upon domain specific types but will render to a ready to use store native
* representation within a given {@link AggregationOperationContext context}. The most straight forward way of writing a
* custom {@link AggregationStage} is just returning the raw document.
*
* <pre class="code">
* AggregationStage stage = (ctx) -> Document.parse("{ $sort : { borough : 1 } }");
* </pre>
*
* @author Christoph Strobl
* @since 4.1
*/
public interface AggregationStage {
/**
* Turns the {@link AggregationStage} into a {@link Document} by using the given {@link AggregationOperationContext}.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the ready to use {@link Document} representing the stage.
*/
Document toDocument(AggregationOperationContext context);
}

View File

@@ -25,7 +25,6 @@ import java.util.StringJoiner;
import java.util.stream.Collectors;
import org.bson.Document;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.SerializationUtils;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
@@ -71,7 +70,8 @@ import org.springframework.util.Assert;
*
* @author Christoph Strobl
* @author Mark Paluch
* @see <a href="https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-aggregation-pipeline">MongoDB
* @see <a href=
* "https://docs.mongodb.com/manual/reference/method/db.collection.update/#update-with-aggregation-pipeline">MongoDB
* Reference Documentation</a>
* @since 3.0
*/
@@ -92,11 +92,11 @@ public class AggregationUpdate extends Aggregation implements UpdateDefinition {
*
* @param pipeline must not be {@literal null}.
*/
protected AggregationUpdate(List<AggregationOperation> pipeline) {
protected AggregationUpdate(List<? extends AggregationStage> pipeline) {
super(pipeline);
for (AggregationOperation operation : pipeline) {
for (AggregationStage operation : pipeline) {
if (operation instanceof FieldsExposingAggregationOperation) {
((FieldsExposingAggregationOperation) operation).getFields().forEach(it -> {
keysTouched.add(it.getName());
@@ -123,6 +123,16 @@ public class AggregationUpdate extends Aggregation implements UpdateDefinition {
return new AggregationUpdate(pipeline);
}
/**
* Create a new AggregationUpdate from the given {@link AggregationStage stages}.
*
* @return new instance of {@link AggregationUpdate}.
* @since 4.1
*/
public static AggregationUpdate updateFrom(List<? extends AggregationStage> stages) {
return new AggregationUpdate(stages);
}
/**
* Adds new fields to documents. {@code $set} outputs documents that contain all existing fields from the input
* documents and newly added fields.

View File

@@ -78,6 +78,23 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
return new FacetOperationBuilder(facets, Arrays.asList(operations));
}
/**
* Creates a new {@link FacetOperationBuilder} to append a new facet using {@literal operations}. <br />
* {@link FacetOperationBuilder} takes a pipeline of {@link AggregationStage stages} to categorize documents into a
* single facet.
*
* @param stages must not be {@literal null} or empty.
* @return
* @since 4.1
*/
public FacetOperationBuilder and(AggregationStage... stages) {
Assert.notNull(stages, "Stages must not be null");
Assert.notEmpty(stages, "Stages must not be empty");
return new FacetOperationBuilder(facets, Arrays.asList(stages));
}
@Override
public Document toDocument(AggregationOperationContext context) {
return new Document(getOperator(), facets.toDocument(context));
@@ -102,11 +119,11 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
public static class FacetOperationBuilder {
private final Facets current;
private final List<AggregationOperation> operations;
private final List<AggregationStage> operations;
private FacetOperationBuilder(Facets current, List<AggregationOperation> operations) {
private FacetOperationBuilder(Facets current, List<? extends AggregationStage> operations) {
this.current = current;
this.operations = operations;
this.operations = new ArrayList<>(operations);
}
/**
@@ -176,7 +193,7 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
* @param operations must not be {@literal null}.
* @return the new {@link Facets}.
*/
Facets and(String fieldName, List<AggregationOperation> operations) {
Facets and(String fieldName, List<? extends AggregationStage> operations) {
Assert.hasText(fieldName, "FieldName must not be null or empty");
Assert.notNull(operations, "AggregationOperations must not be null");
@@ -197,21 +214,21 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
private static class Facet {
private final ExposedField exposedField;
private final List<AggregationOperation> operations;
private final List<AggregationStage> stages;
/**
* Creates a new {@link Facet} given {@link ExposedField} and {@link AggregationOperation} pipeline.
*
* @param exposedField must not be {@literal null}.
* @param operations must not be {@literal null}.
* @param stages must not be {@literal null}.
*/
Facet(ExposedField exposedField, List<AggregationOperation> operations) {
Facet(ExposedField exposedField, List<? extends AggregationStage> stages) {
Assert.notNull(exposedField, "ExposedField must not be null");
Assert.notNull(operations, "AggregationOperations must not be null");
Assert.notNull(stages, "AggregationOperations must not be null");
this.exposedField = exposedField;
this.operations = operations;
this.stages = new ArrayList<>(stages);
}
ExposedField getExposedField() {
@@ -219,7 +236,7 @@ public class FacetOperation implements FieldsExposingAggregationOperation {
}
protected List<Document> toDocuments(AggregationOperationContext context) {
return AggregationOperationRenderer.toDocument(operations, context);
return AggregationOperationRenderer.toDocument(stages, context);
}
}
}

View File

@@ -15,28 +15,44 @@
*/
package org.springframework.data.mongodb.core.aggregation;
import java.util.function.Supplier;
import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.ExposedFields.ExposedField;
import org.springframework.data.mongodb.core.aggregation.FieldsExposingAggregationOperation.InheritsFieldsAggregationOperation;
import org.springframework.data.mongodb.core.aggregation.VariableOperators.Let;
import org.springframework.data.mongodb.core.aggregation.VariableOperators.Let.ExpressionVariable;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Encapsulates the aggregation framework {@code $lookup}-operation. We recommend to use the static factory method
* {@link Aggregation#lookup(String, String, String, String)} instead of creating instances of this class directly.
* Encapsulates the aggregation framework {@code $lookup}-operation. We recommend to use the builder provided via
* {@link #newLookup()} instead of creating instances of this class directly.
*
* @author Alessio Fachechi
* @author Christoph Strobl
* @author Mark Paluch
* @author Sangyong Choi
* @since 1.9
* @see <a href="https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/">MongoDB Aggregation Framework:
* $lookup</a>
*/
public class LookupOperation implements FieldsExposingAggregationOperation, InheritsFieldsAggregationOperation {
private final Field from;
private final String from;
@Nullable //
private final Field localField;
@Nullable //
private final Field foreignField;
@Nullable //
private final Let let;
@Nullable //
private final AggregationPipeline pipeline;
private final ExposedField as;
/**
@@ -48,16 +64,55 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
* @param as must not be {@literal null}.
*/
public LookupOperation(Field from, Field localField, Field foreignField, Field as) {
this(((Supplier<String>) () -> {
Assert.notNull(from, "From must not be null");
return from.getTarget();
}).get(), localField, foreignField, null, null, as);
}
/**
* Creates a new {@link LookupOperation} for the given combination of {@link Field}s and {@link AggregationPipeline
* pipeline}.
*
* @param from must not be {@literal null}.
* @param let must not be {@literal null}.
* @param as must not be {@literal null}.
* @since 4.1
*/
public LookupOperation(String from, @Nullable Let let, AggregationPipeline pipeline, Field as) {
this(from, null, null, let, pipeline, as);
}
/**
* Creates a new {@link LookupOperation} for the given combination of {@link Field}s and {@link AggregationPipeline
* pipeline}.
*
* @param from must not be {@literal null}.
* @param localField can be {@literal null} if {@literal pipeline} is present.
* @param foreignField can be {@literal null} if {@literal pipeline} is present.
* @param let can be {@literal null} if {@literal localField} and {@literal foreignField} are present.
* @param as must not be {@literal null}.
* @since 4.1
*/
public LookupOperation(String from, @Nullable Field localField, @Nullable Field foreignField, @Nullable Let let,
@Nullable AggregationPipeline pipeline, Field as) {
Assert.notNull(from, "From must not be null");
Assert.notNull(localField, "LocalField must not be null");
Assert.notNull(foreignField, "ForeignField must not be null");
if (pipeline == null) {
Assert.notNull(localField, "LocalField must not be null");
Assert.notNull(foreignField, "ForeignField must not be null");
} else if (localField == null && foreignField == null) {
Assert.notNull(pipeline, "Pipeline must not be null");
}
Assert.notNull(as, "As must not be null");
this.from = from;
this.localField = localField;
this.foreignField = foreignField;
this.as = new ExposedField(as, true);
this.let = let;
this.pipeline = pipeline;
}
@Override
@@ -70,9 +125,20 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
Document lookupObject = new Document();
lookupObject.append("from", from.getTarget());
lookupObject.append("localField", localField.getTarget());
lookupObject.append("foreignField", foreignField.getTarget());
lookupObject.append("from", from);
if (localField != null) {
lookupObject.append("localField", localField.getTarget());
}
if (foreignField != null) {
lookupObject.append("foreignField", foreignField.getTarget());
}
if (let != null) {
lookupObject.append("let", let.toDocument(context).get("$let", Document.class).get("vars"));
}
if (pipeline != null) {
lookupObject.append("pipeline", pipeline.toDocuments(context));
}
lookupObject.append("as", as.getTarget());
return new Document(getOperator(), lookupObject);
@@ -101,7 +167,7 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
LocalFieldBuilder from(String name);
}
public static interface LocalFieldBuilder {
public static interface LocalFieldBuilder extends PipelineBuilder {
/**
* @param name the field from the documents input to the {@code $lookup} stage, must not be {@literal null} or
@@ -120,7 +186,78 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
AsBuilder foreignField(String name);
}
public static interface AsBuilder {
/**
* @since 4.1
* @author Christoph Strobl
*/
public interface LetBuilder {
/**
* Specifies {@link Let#getVariableNames() variables) that can be used in the
* {@link PipelineBuilder#pipeline(AggregationOperation...) pipeline stages}.
*
* @param let must not be {@literal null}.
* @return never {@literal null}.
* @see PipelineBuilder
*/
PipelineBuilder let(Let let);
/**
* Specifies {@link Let#getVariableNames() variables) that can be used in the
* {@link PipelineBuilder#pipeline(AggregationOperation...) pipeline stages}.
*
* @param variables must not be {@literal null}.
* @return never {@literal null}.
* @see PipelineBuilder
*/
default PipelineBuilder let(ExpressionVariable... variables) {
return let(Let.just(variables));
}
}
/**
* @since 4.1
* @author Christoph Strobl
*/
public interface PipelineBuilder extends LetBuilder {
/**
* Specifies the {@link AggregationPipeline pipeline} that determines the resulting documents.
*
* @param pipeline must not be {@literal null}.
* @return never {@literal null}.
*/
AsBuilder pipeline(AggregationPipeline pipeline);
/**
* Specifies the {@link AggregationPipeline#getStages() stages} that determine the resulting documents.
*
* @param stages must not be {@literal null} can be empty.
* @return never {@literal null}.
*/
default AsBuilder pipeline(AggregationOperation... stages) {
return pipeline(AggregationPipeline.of(stages));
}
/**
* Specifies the {@link AggregationPipeline#getStages() stages} that determine the resulting documents.
*
* @param stages must not be {@literal null} can be empty.
* @return never {@literal null}.
* @since 4.1
*/
default AsBuilder pipeline(AggregationStage... stages) {
return pipeline(AggregationPipeline.of(stages));
}
/**
* @param name the name of the new array field to add to the input documents, must not be {@literal null} or empty.
* @return new instance of {@link LookupOperation}.
*/
LookupOperation as(String name);
}
public static interface AsBuilder extends PipelineBuilder {
/**
* @param name the name of the new array field to add to the input documents, must not be {@literal null} or empty.
@@ -138,10 +275,12 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
public static final class LookupOperationBuilder
implements FromBuilder, LocalFieldBuilder, ForeignFieldBuilder, AsBuilder {
private @Nullable Field from;
private @Nullable String from;
private @Nullable Field localField;
private @Nullable Field foreignField;
private @Nullable ExposedField as;
private @Nullable Let let;
private @Nullable AggregationPipeline pipeline;
/**
* Creates new builder for {@link LookupOperation}.
@@ -156,18 +295,10 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
public LocalFieldBuilder from(String name) {
Assert.hasText(name, "'From' must not be null or empty");
from = Fields.field(name);
from = name;
return this;
}
@Override
public LookupOperation as(String name) {
Assert.hasText(name, "'As' must not be null or empty");
as = new ExposedField(Fields.field(name), true);
return new LookupOperation(from, localField, foreignField, as);
}
@Override
public AsBuilder foreignField(String name) {
@@ -183,5 +314,29 @@ public class LookupOperation implements FieldsExposingAggregationOperation, Inhe
localField = Fields.field(name);
return this;
}
@Override
public PipelineBuilder let(Let let) {
Assert.notNull(let, "Let must not be null");
this.let = let;
return this;
}
@Override
public AsBuilder pipeline(AggregationPipeline pipeline) {
Assert.notNull(pipeline, "Pipeline must not be null");
this.pipeline = pipeline;
return this;
}
@Override
public LookupOperation as(String name) {
Assert.hasText(name, "'As' must not be null or empty");
as = new ExposedField(Fields.field(name), true);
return new LookupOperation(from, localField, foreignField, let, pipeline, as);
}
}
}

View File

@@ -0,0 +1,103 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.aggregation;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.bson.Document;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* An {@link AggregationStage} that may consist of a main operation and potential follow up stages for eg. {@code $sort}
* or {@code $limit}.
* <p>
* The {@link MultiOperationAggregationStage} may operate upon domain specific types but will render to the store native
* representation within a given {@link AggregationOperationContext context}.
* <p>
* {@link #toDocument(AggregationOperationContext)} will render a synthetic {@link Document} that contains the ordered
* stages. The list returned from {@link #toPipelineStages(AggregationOperationContext)}
*
* <pre class="code">
* [
* { $match: { $text: { $search: "operating" } } },
* { $sort: { score: { $meta: "textScore" }, posts: -1 } }
* ]
* </pre>
*
* will be represented as
*
* <pre class="code">
* {
* $match: { $text: { $search: "operating" } },
* $sort: { score: { $meta: "textScore" }, posts: -1 }
* }
* </pre>
*
* In case stages appear multiple times the order no longer can be guaranteed when calling
* {@link #toDocument(AggregationOperationContext)}, so consumers of the API should rely on
* {@link #toPipelineStages(AggregationOperationContext)}. Nevertheless, by default the values will be collected into a
* list rendering to
*
* <pre class="code">
* {
* $match: [{ $text: { $search: "operating" } }, { $text: ... }],
* $sort: { score: { $meta: "textScore" }, posts: -1 }
* }
* </pre>
*
* @author Christoph Strobl
* @since 4.1
*/
public interface MultiOperationAggregationStage extends AggregationStage {
/**
* Returns a synthetic {@link Document stage} that contains the {@link #toPipelineStages(AggregationOperationContext)
* actual stages} by folding them into a single {@link Document}. In case of colliding entries, those used multiple
* times thus having the same key, the entries will be held as a list for the given operator.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return never {@literal null}.
*/
@Override
default Document toDocument(AggregationOperationContext context) {
List<Document> documents = toPipelineStages(context);
if (documents.size() == 1) {
return documents.get(0);
}
MultiValueMap<String, Document> stages = new LinkedMultiValueMap<>(documents.size());
for (Document current : documents) {
String key = current.keySet().iterator().next();
stages.add(key, current.get(key, Document.class));
}
return new Document(stages.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, v -> v.getValue().size() == 1 ? v.getValue().get(0) : v.getValue())));
}
/**
* Turns the {@link MultiOperationAggregationStage} into list of {@link Document stages} by using the given
* {@link AggregationOperationContext}. This allows an {@link AggregationStage} to add follow up stages for eg.
* {@code $sort} or {@code $limit}.
*
* @param context the {@link AggregationOperationContext} to operate within. Must not be {@literal null}.
* @return the pipeline stages to run through. Never {@literal null}.
*/
List<Document> toPipelineStages(AggregationOperationContext context);
}

View File

@@ -15,6 +15,7 @@
*/
package org.springframework.data.mongodb.core.aggregation;
import java.util.Arrays;
import java.util.List;
import org.springframework.util.Assert;
@@ -24,6 +25,7 @@ import org.springframework.util.Assert;
*
* @author Thomas Darimont
* @author Oliver Gierke
* @author Christoph Strobl
*/
public class TypedAggregation<I> extends Aggregation {
@@ -39,13 +41,24 @@ public class TypedAggregation<I> extends Aggregation {
this(inputType, asAggregationList(operations));
}
/**
* Creates a new {@link TypedAggregation} from the given {@link AggregationStage stages}.
*
* @param inputType must not be {@literal null}.
* @param stages must not be {@literal null} or empty.
* @since 4.1
*/
public TypedAggregation(Class<I> inputType, AggregationStage... stages) {
this(inputType, Arrays.asList(stages));
}
/**
* Creates a new {@link TypedAggregation} from the given {@link AggregationOperation}s.
*
* @param inputType must not be {@literal null}.
* @param operations must not be {@literal null} or empty.
*/
public TypedAggregation(Class<I> inputType, List<AggregationOperation> operations) {
public TypedAggregation(Class<I> inputType, List<? extends AggregationStage> operations) {
this(inputType, operations, DEFAULT_OPTIONS);
}
@@ -57,7 +70,7 @@ public class TypedAggregation<I> extends Aggregation {
* @param operations must not be {@literal null} or empty.
* @param options must not be {@literal null}.
*/
public TypedAggregation(Class<I> inputType, List<AggregationOperation> operations, AggregationOptions options) {
public TypedAggregation(Class<I> inputType, List<? extends AggregationStage> operations, AggregationOptions options) {
super(operations, options);
@@ -77,6 +90,6 @@ public class TypedAggregation<I> extends Aggregation {
public TypedAggregation<I> withOptions(AggregationOptions options) {
Assert.notNull(options, "AggregationOptions must not be null");
return new TypedAggregation<I>(inputType, pipeline.getOperations(), options);
return new TypedAggregation<>(inputType, pipeline.getStages(), options);
}
}

View File

@@ -16,7 +16,6 @@
package org.springframework.data.mongodb.core.aggregation;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -224,28 +223,41 @@ public class VariableOperators {
public static class Let implements AggregationExpression {
private final List<ExpressionVariable> vars;
@Nullable //
private final AggregationExpression expression;
private Let(List<ExpressionVariable> vars, AggregationExpression expression) {
private Let(List<ExpressionVariable> vars, @Nullable AggregationExpression expression) {
this.vars = vars;
this.expression = expression;
}
/**
* Create a new {@link Let} holding just the given {@literal variables}.
*
* @param variables must not be {@literal null}.
* @return new instance of {@link Let}.
* @since 4.1
*/
public static Let just(ExpressionVariable... variables) {
return new Let(List.of(variables), null);
}
/**
* Start creating new {@link Let} by defining the variables for {@code $vars}.
*
* @param variables must not be {@literal null}.
* @return
*/
public static LetBuilder define(final Collection<ExpressionVariable> variables) {
public static LetBuilder define(Collection<ExpressionVariable> variables) {
Assert.notNull(variables, "Variables must not be null");
return new LetBuilder() {
@Override
public Let andApply(final AggregationExpression expression) {
public Let andApply(AggregationExpression expression) {
Assert.notNull(expression, "Expression must not be null");
return new Let(new ArrayList<ExpressionVariable>(variables), expression);
@@ -259,19 +271,10 @@ public class VariableOperators {
* @param variables must not be {@literal null}.
* @return
*/
public static LetBuilder define(final ExpressionVariable... variables) {
public static LetBuilder define(ExpressionVariable... variables) {
Assert.notNull(variables, "Variables must not be null");
return new LetBuilder() {
@Override
public Let andApply(final AggregationExpression expression) {
Assert.notNull(expression, "Expression must not be null");
return new Let(Arrays.asList(variables), expression);
}
};
return define(List.of(variables));
}
public interface LetBuilder {
@@ -283,10 +286,11 @@ public class VariableOperators {
* @return
*/
Let andApply(AggregationExpression expression);
}
@Override
public Document toDocument(final AggregationOperationContext context) {
public Document toDocument(AggregationOperationContext context) {
return toLet(ExposedFields.synthetic(Fields.fields(getVariableNames())), context);
}
@@ -312,16 +316,22 @@ public class VariableOperators {
}
letExpression.put("vars", mappedVars);
letExpression.put("in", getMappedIn(operationContext));
if (expression != null) {
letExpression.put("in", getMappedIn(operationContext));
}
return new Document("$let", letExpression);
}
private Document getMappedVariable(ExpressionVariable var, AggregationOperationContext context) {
return new Document(var.variableName,
var.expression instanceof AggregationExpression ? ((AggregationExpression) var.expression).toDocument(context)
: var.expression);
if (var.expression instanceof AggregationExpression expression) {
return new Document(var.variableName, expression.toDocument(context));
}
if (var.expression instanceof Field field) {
return new Document(var.variableName, context.getReference(field).toString());
}
return new Document(var.variableName, var.expression);
}
private Object getMappedIn(AggregationOperationContext context) {
@@ -373,6 +383,10 @@ public class VariableOperators {
return new ExpressionVariable(variableName, expression);
}
public ExpressionVariable forField(String fieldRef) {
return new ExpressionVariable(variableName, Fields.field(fieldRef));
}
/**
* Create a new {@link ExpressionVariable} with current name and given {@literal expressionObject}.
*

View File

@@ -24,11 +24,16 @@ import org.springframework.data.geo.Distance;
import org.springframework.data.geo.Metric;
import org.springframework.data.geo.Metrics;
import org.springframework.data.geo.Point;
import org.springframework.data.mongodb.core.ReadConcernAware;
import org.springframework.data.mongodb.core.ReadPreferenceAware;
import org.springframework.data.mongodb.core.geo.GeoJsonPoint;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
/**
* Builder class to build near-queries. <br />
* MongoDB {@code $geoNear} operator allows usage of a {@literal GeoJSON Point} or legacy coordinate pair. Though
@@ -171,7 +176,7 @@ import org.springframework.util.ObjectUtils;
* @author Christoph Strobl
* @author Mark Paluch
*/
public final class NearQuery {
public final class NearQuery implements ReadConcernAware, ReadPreferenceAware {
private final Point point;
private @Nullable Query query;
@@ -181,6 +186,8 @@ public final class NearQuery {
private boolean spherical;
private @Nullable Long limit;
private @Nullable Long skip;
private @Nullable ReadConcern readConcern;
private @Nullable ReadPreference readPreference;
/**
* Creates a new {@link NearQuery}.
@@ -555,6 +562,74 @@ public final class NearQuery {
return query != null ? query.getCollation().orElse(null) : null;
}
/**
* Configures the query to use the given {@link ReadConcern} unless the underlying {@link #query(Query)}
* {@link Query#hasReadConcern() specifies} another one.
*
* @param readConcern must not be {@literal null}.
* @return this.
* @since 4.1
*/
public NearQuery withReadConcern(ReadConcern readConcern) {
Assert.notNull(readConcern, "ReadConcern must not be null");
this.readConcern = readConcern;
return this;
}
/**
* Configures the query to use the given {@link ReadPreference} unless the underlying {@link #query(Query)}
* {@link Query#hasReadPreference() specifies} another one.
*
* @param readPreference must not be {@literal null}.
* @return this.
* @since 4.1
*/
public NearQuery withReadPreference(ReadPreference readPreference) {
Assert.notNull(readPreference, "ReadPreference must not be null");
this.readPreference = readPreference;
return this;
}
/**
* Get the {@link ReadConcern} to use. Will return the underlying {@link #query(Query) queries}
* {@link Query#getReadConcern() ReadConcern} if present or the one defined on the {@link NearQuery#readConcern}
* itself.
*
* @return can be {@literal null} if none set.
* @since 4.1
* @see ReadConcernAware
*/
@Nullable
@Override
public ReadConcern getReadConcern() {
if (query != null && query.hasReadConcern()) {
return query.getReadConcern();
}
return readConcern;
}
/**
* Get the {@link ReadPreference} to use. Will return the underlying {@link #query(Query) queries}
* {@link Query#getReadPreference() ReadPreference} if present or the one defined on the
* {@link NearQuery#readPreference} itself.
*
* @return can be {@literal null} if none set.
* @since 4.1
* @see ReadPreferenceAware
*/
@Nullable
@Override
public ReadPreference getReadPreference() {
if (query != null && query.hasReadPreference()) {
return query.getReadPreference();
}
return readPreference;
}
/**
* Returns the {@link Document} built by the {@link NearQuery}.
*

View File

@@ -34,10 +34,16 @@ import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Order;
import org.springframework.data.mongodb.InvalidMongoDbApiUsageException;
import org.springframework.data.mongodb.core.ReadConcernAware;
import org.springframework.data.mongodb.core.ReadPreferenceAware;
import org.springframework.data.mongodb.core.query.Meta.CursorOption;
import org.springframework.data.mongodb.util.BsonUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
/**
* MongoDB Query object representing criteria, projection, sorting and query hints.
*
@@ -48,7 +54,7 @@ import org.springframework.util.Assert;
* @author Mark Paluch
* @author Anton Barkan
*/
public class Query {
public class Query implements ReadConcernAware, ReadPreferenceAware {
private static final String RESTRICTED_TYPES_KEY = "_$RESTRICTED_TYPES";
@@ -58,6 +64,9 @@ public class Query {
private Sort sort = Sort.unsorted();
private long skip;
private int limit;
private @Nullable ReadConcern readConcern;
private @Nullable ReadPreference readPreference;
private @Nullable String hint;
private Meta meta = new Meta();
@@ -160,6 +169,59 @@ public class Query {
return this;
}
/**
* Configures the query to use the given {@link ReadConcern} when being executed.
*
* @param readConcern must not be {@literal null}.
* @return this.
* @since 3.1
*/
public Query withReadConcern(ReadConcern readConcern) {
Assert.notNull(readConcern, "ReadConcern must not be null");
this.readConcern = readConcern;
return this;
}
/**
* Configures the query to use the given {@link ReadPreference} when being executed.
*
* @param readPreference must not be {@literal null}.
* @return this.
* @since 4.1
*/
public Query withReadPreference(ReadPreference readPreference) {
Assert.notNull(readPreference, "ReadPreference must not be null");
this.readPreference = readPreference;
return this;
}
@Override
public boolean hasReadConcern() {
return this.readConcern != null;
}
@Override
public ReadConcern getReadConcern() {
return this.readConcern;
}
@Override
public boolean hasReadPreference() {
return this.readPreference != null || getMeta().getFlags().contains(CursorOption.SECONDARY_READS);
}
@Override
public ReadPreference getReadPreference() {
if (readPreference == null) {
return getMeta().getFlags().contains(CursorOption.SECONDARY_READS) ? ReadPreference.primaryPreferred() : null;
}
return this.readPreference;
}
/**
* Configures the query to use the given {@link Document hint} when being executed.
*

View File

@@ -27,6 +27,7 @@ import org.springframework.data.domain.Sort.Order;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationStage;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Collation;
import org.springframework.data.mongodb.core.query.Meta;
@@ -109,14 +110,14 @@ abstract class AggregationUtils {
* @param accessor
* @param targetType
*/
static void appendSortIfPresent(List<AggregationOperation> aggregationPipeline, ConvertingParameterAccessor accessor,
static void appendSortIfPresent(List<? extends AggregationStage> aggregationPipeline, ConvertingParameterAccessor accessor,
Class<?> targetType) {
if (accessor.getSort().isUnsorted()) {
return;
}
aggregationPipeline.add(ctx -> {
((List<AggregationStage>) aggregationPipeline).add(ctx -> {
Document sort = new Document();
for (Order order : accessor.getSort()) {
@@ -134,7 +135,7 @@ abstract class AggregationUtils {
* @param aggregationPipeline
* @param accessor
*/
static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregationPipeline,
static void appendLimitAndOffsetIfPresent(List<? extends AggregationStage> aggregationPipeline,
ConvertingParameterAccessor accessor) {
appendLimitAndOffsetIfPresent(aggregationPipeline, accessor, LongUnaryOperator.identity(),
IntUnaryOperator.identity());
@@ -150,7 +151,7 @@ abstract class AggregationUtils {
* @param limitOperator
* @since 3.3
*/
static void appendLimitAndOffsetIfPresent(List<AggregationOperation> aggregationPipeline,
static void appendLimitAndOffsetIfPresent(List<? extends AggregationStage> aggregationPipeline,
ConvertingParameterAccessor accessor, LongUnaryOperator offsetOperator, IntUnaryOperator limitOperator) {
Pageable pageable = accessor.getPageable();
@@ -159,10 +160,10 @@ abstract class AggregationUtils {
}
if (pageable.getOffset() > 0) {
aggregationPipeline.add(Aggregation.skip(offsetOperator.applyAsLong(pageable.getOffset())));
((List<AggregationStage>) aggregationPipeline).add(Aggregation.skip(offsetOperator.applyAsLong(pageable.getOffset())));
}
aggregationPipeline.add(Aggregation.limit(limitOperator.applyAsInt(pageable.getPageSize())));
((List<AggregationStage>) aggregationPipeline).add(Aggregation.limit(limitOperator.applyAsInt(pageable.getPageSize())));
}
/**

View File

@@ -64,6 +64,7 @@ class QueryUtils {
combinedSort.putAll((Document) invocation.proceed());
return combinedSort;
});
factory.setInterfaces(new Class[0]);
return (Query) factory.getProxy(query.getClass().getClassLoader());
}
@@ -113,7 +114,7 @@ class QueryUtils {
if(parameters.isEmpty()) {
return -1;
}
int i = 0;
for(Class<?> parameterType : parameters) {
if(ClassUtils.isAssignable(type, parameterType)) {

View File

@@ -108,6 +108,7 @@ import org.springframework.util.CollectionUtils;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoException;
import com.mongodb.MongoNamespace;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.ServerCursor;
@@ -120,16 +121,7 @@ import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.CountOptions;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.DeleteOptions;
import com.mongodb.client.model.FindOneAndDeleteOptions;
import com.mongodb.client.model.FindOneAndReplaceOptions;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.MapReduceAction;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.TimeSeriesGranularity;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.*;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
@@ -182,6 +174,7 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
when(collection.estimatedDocumentCount(any())).thenReturn(1L);
when(collection.getNamespace()).thenReturn(new MongoNamespace("db.mock-collection"));
when(collection.aggregate(any(List.class), any())).thenReturn(aggregateIterable);
when(collection.withReadConcern(any())).thenReturn(collection);
when(collection.withReadPreference(any())).thenReturn(collection);
when(collection.replaceOne(any(), any(), any(ReplaceOptions.class))).thenReturn(updateResult);
when(collection.withWriteConcern(any())).thenReturn(collectionWithWriteConcern);
@@ -478,6 +471,34 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
verify(collection, never()).withReadPreference(any());
}
@Test // GH-4277
void aggregateShouldHonorOptionsReadConcernWhenSet() {
AggregationOptions options = AggregationOptions.builder().readConcern(ReadConcern.SNAPSHOT).build();
template.aggregate(newAggregation(Aggregation.unwind("foo")).withOptions(options), "collection-1", Wrapper.class);
verify(collection).withReadConcern(ReadConcern.SNAPSHOT);
}
@Test // GH-4277
void aggregateShouldHonorOptionsReadPreferenceWhenSet() {
AggregationOptions options = AggregationOptions.builder().readPreference(ReadPreference.secondary()).build();
template.aggregate(newAggregation(Aggregation.unwind("foo")).withOptions(options), "collection-1", Wrapper.class);
verify(collection).withReadPreference(ReadPreference.secondary());
}
@Test // GH-4277
void aggregateStreamShouldHonorOptionsReadPreferenceWhenSet() {
AggregationOptions options = AggregationOptions.builder().readPreference(ReadPreference.secondary()).build();
template.aggregateStream(newAggregation(Aggregation.unwind("foo")).withOptions(options), "collection-1",
Wrapper.class);
verify(collection).withReadPreference(ReadPreference.secondary());
}
@Test // DATAMONGO-2153
void aggregateShouldHonorOptionsComment() {
@@ -558,6 +579,28 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
verify(collection).withReadPreference(eq(ReadPreference.secondary()));
}
@Test // GH-4277
void geoNearShouldHonorReadPreferenceFromQuery() {
NearQuery query = NearQuery.near(new Point(1, 1));
query.withReadPreference(ReadPreference.secondary());
template.geoNear(query, Wrapper.class);
verify(collection).withReadPreference(eq(ReadPreference.secondary()));
}
@Test // GH-4277
void geoNearShouldHonorReadConcernFromQuery() {
NearQuery query = NearQuery.near(new Point(1, 1));
query.withReadConcern(ReadConcern.SNAPSHOT);
template.geoNear(query, Wrapper.class);
verify(collection).withReadConcern(eq(ReadConcern.SNAPSHOT));
}
@Test // DATAMONGO-1166, DATAMONGO-2264
void geoNearShouldIgnoreReadPreferenceWhenNotSet() {
@@ -802,6 +845,24 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
verify(findIterable).batchSize(1234);
}
@Test // GH-4277
void findShouldUseReadConcernWhenPresent() {
template.find(new BasicQuery("{'foo' : 'bar'}").withReadConcern(ReadConcern.SNAPSHOT),
AutogenerateableId.class);
verify(collection).withReadConcern(ReadConcern.SNAPSHOT);
}
@Test // GH-4277
void findShouldUseReadPreferenceWhenPresent() {
template.find(new BasicQuery("{'foo' : 'bar'}").withReadPreference(ReadPreference.secondary()),
AutogenerateableId.class);
verify(collection).withReadPreference(ReadPreference.secondary());
}
@Test // DATAMONGO-1518
void executeQueryShouldUseCollationWhenPresent() {
@@ -1048,7 +1109,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
@Test // DATAMONGO-1733
void appliesFieldsWhenInterfaceProjectionIsClosedAndQueryDoesNotDefineFields() {
template.doFind("star-wars", new Document(), new Document(), Person.class, PersonProjection.class,
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document(), Person.class,
PersonProjection.class,
CursorPreparer.NO_OP_PREPARER);
verify(findIterable).projection(eq(new Document("firstname", 1)));
@@ -1057,7 +1119,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
@Test // DATAMONGO-1733
void doesNotApplyFieldsWhenInterfaceProjectionIsClosedAndQueryDefinesFields() {
template.doFind("star-wars", new Document(), new Document("bar", 1), Person.class, PersonProjection.class,
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document("bar", 1), Person.class,
PersonProjection.class,
CursorPreparer.NO_OP_PREPARER);
verify(findIterable).projection(eq(new Document("bar", 1)));
@@ -1066,7 +1129,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
@Test // DATAMONGO-1733
void doesNotApplyFieldsWhenInterfaceProjectionIsOpen() {
template.doFind("star-wars", new Document(), new Document(), Person.class, PersonSpELProjection.class,
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document(), Person.class,
PersonSpELProjection.class,
CursorPreparer.NO_OP_PREPARER);
verify(findIterable).projection(eq(BsonUtils.EMPTY_DOCUMENT));
@@ -1075,7 +1139,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
@Test // DATAMONGO-1733, DATAMONGO-2041
void appliesFieldsToDtoProjection() {
template.doFind("star-wars", new Document(), new Document(), Person.class, Jedi.class,
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document(), Person.class,
Jedi.class,
CursorPreparer.NO_OP_PREPARER);
verify(findIterable).projection(eq(new Document("firstname", 1)));
@@ -1084,7 +1149,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
@Test // DATAMONGO-1733
void doesNotApplyFieldsToDtoProjectionWhenQueryDefinesFields() {
template.doFind("star-wars", new Document(), new Document("bar", 1), Person.class, Jedi.class,
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document("bar", 1), Person.class,
Jedi.class,
CursorPreparer.NO_OP_PREPARER);
verify(findIterable).projection(eq(new Document("bar", 1)));
@@ -1093,7 +1159,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
@Test // DATAMONGO-1733
void doesNotApplyFieldsWhenTargetIsNotAProjection() {
template.doFind("star-wars", new Document(), new Document(), Person.class, Person.class,
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document(), Person.class,
Person.class,
CursorPreparer.NO_OP_PREPARER);
verify(findIterable).projection(eq(BsonUtils.EMPTY_DOCUMENT));
@@ -1102,7 +1169,8 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
@Test // DATAMONGO-1733
void doesNotApplyFieldsWhenTargetExtendsDomainType() {
template.doFind("star-wars", new Document(), new Document(), Person.class, PersonExtended.class,
template.doFind(CollectionPreparer.identity(), "star-wars", new Document(), new Document(), Person.class,
PersonExtended.class,
CursorPreparer.NO_OP_PREPARER);
verify(findIterable).projection(eq(BsonUtils.EMPTY_DOCUMENT));

View File

@@ -23,8 +23,6 @@ import static org.springframework.data.mongodb.test.util.Assertions.assertThat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.mongodb.core.MongoTemplateUnitTests.Wrapper;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -56,12 +54,12 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.data.annotation.Id;
import org.springframework.data.geo.Point;
import org.springframework.data.mapping.MappingException;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
import org.springframework.data.mapping.context.MappingContext;
@@ -100,6 +98,7 @@ import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.CollectionUtils;
import com.mongodb.MongoClientSettings;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.client.model.CountOptions;
import com.mongodb.client.model.CreateCollectionOptions;
@@ -168,6 +167,7 @@ public class ReactiveMongoTemplateUnitTests {
when(db.runCommand(any(), any(Class.class))).thenReturn(runCommandPublisher);
when(db.createCollection(any(), any(CreateCollectionOptions.class))).thenReturn(runCommandPublisher);
when(collection.withReadPreference(any())).thenReturn(collection);
when(collection.withReadConcern(any())).thenReturn(collection);
when(collection.find(any(Class.class))).thenReturn(findPublisher);
when(collection.find(any(Document.class), any(Class.class))).thenReturn(findPublisher);
when(collection.aggregate(anyList())).thenReturn(aggregatePublisher);
@@ -385,11 +385,33 @@ public class ReactiveMongoTemplateUnitTests {
verify(aggregatePublisher).collation(eq(com.mongodb.client.model.Collation.builder().locale("fr").build()));
}
@Test // GH-4277
void geoNearShouldHonorReadPreferenceFromQuery() {
NearQuery query = NearQuery.near(new Point(1, 1));
query.withReadPreference(ReadPreference.secondary());
template.geoNear(query, Wrapper.class).subscribe();
verify(collection).withReadPreference(eq(ReadPreference.secondary()));
}
@Test // GH-4277
void geoNearShouldHonorReadConcernFromQuery() {
NearQuery query = NearQuery.near(new Point(1, 1));
query.withReadConcern(ReadConcern.SNAPSHOT);
template.geoNear(query, Wrapper.class).subscribe();
verify(collection).withReadConcern(eq(ReadConcern.SNAPSHOT));
}
@Test // DATAMONGO-1719
void appliesFieldsWhenInterfaceProjectionIsClosedAndQueryDoesNotDefineFields() {
template.doFind("star-wars", new Document(), new Document(), Person.class, PersonProjection.class,
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document(), Person.class,
PersonProjection.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
verify(findPublisher).projection(eq(new Document("firstname", 1)));
}
@@ -397,8 +419,8 @@ public class ReactiveMongoTemplateUnitTests {
@Test // DATAMONGO-1719
void doesNotApplyFieldsWhenInterfaceProjectionIsClosedAndQueryDefinesFields() {
template.doFind("star-wars", new Document(), new Document("bar", 1), Person.class, PersonProjection.class,
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document("bar", 1), Person.class,
PersonProjection.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
verify(findPublisher).projection(eq(new Document("bar", 1)));
}
@@ -406,8 +428,8 @@ public class ReactiveMongoTemplateUnitTests {
@Test // DATAMONGO-1719
void doesNotApplyFieldsWhenInterfaceProjectionIsOpen() {
template.doFind("star-wars", new Document(), new Document(), Person.class, PersonSpELProjection.class,
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document(), Person.class,
PersonSpELProjection.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
verify(findPublisher, never()).projection(any());
}
@@ -415,8 +437,8 @@ public class ReactiveMongoTemplateUnitTests {
@Test // DATAMONGO-1719, DATAMONGO-2041
void appliesFieldsToDtoProjection() {
template.doFind("star-wars", new Document(), new Document(), Person.class, Jedi.class,
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document(), Person.class,
Jedi.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
verify(findPublisher).projection(eq(new Document("firstname", 1)));
}
@@ -424,8 +446,8 @@ public class ReactiveMongoTemplateUnitTests {
@Test // DATAMONGO-1719
void doesNotApplyFieldsToDtoProjectionWhenQueryDefinesFields() {
template.doFind("star-wars", new Document(), new Document("bar", 1), Person.class, Jedi.class,
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document("bar", 1), Person.class,
Jedi.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
verify(findPublisher).projection(eq(new Document("bar", 1)));
}
@@ -433,8 +455,8 @@ public class ReactiveMongoTemplateUnitTests {
@Test // DATAMONGO-1719
void doesNotApplyFieldsWhenTargetIsNotAProjection() {
template.doFind("star-wars", new Document(), new Document(), Person.class, Person.class,
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document(), Person.class,
Person.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
verify(findPublisher, never()).projection(any());
}
@@ -442,8 +464,8 @@ public class ReactiveMongoTemplateUnitTests {
@Test // DATAMONGO-1719
void doesNotApplyFieldsWhenTargetExtendsDomainType() {
template.doFind("star-wars", new Document(), new Document(), Person.class, PersonExtended.class,
FindPublisherPreparer.NO_OP_PREPARER).subscribe();
template.doFind("star-wars", CollectionPreparer.identity(), new Document(), new Document(), Person.class,
PersonExtended.class, FindPublisherPreparer.NO_OP_PREPARER).subscribe();
verify(findPublisher, never()).projection(any());
}
@@ -632,6 +654,26 @@ public class ReactiveMongoTemplateUnitTests {
verify(aggregatePublisher).collation(eq(com.mongodb.client.model.Collation.builder().locale("de_AT").build()));
}
@Test // GH-4277
void aggreateShouldUseReadConcern() {
AggregationOptions options = AggregationOptions.builder().readConcern(ReadConcern.SNAPSHOT).build();
template.aggregate(newAggregation(Sith.class, project("id")).withOptions(options), AutogenerateableId.class,
Document.class).subscribe();
verify(collection).withReadConcern(ReadConcern.SNAPSHOT);
}
@Test // GH-4286
void aggreateShouldUseReadReadPreference() {
AggregationOptions options = AggregationOptions.builder().readPreference(ReadPreference.primaryPreferred()).build();
template.aggregate(newAggregation(Sith.class, project("id")).withOptions(options), AutogenerateableId.class,
Document.class).subscribe();
verify(collection).withReadPreference(ReadPreference.primaryPreferred());
}
@Test // DATAMONGO-1854
void aggreateShouldUseCollationFromOptionsEvenIfDefaultCollationIsPresent() {

View File

@@ -0,0 +1,49 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.aggregation;
import static org.assertj.core.api.Assertions.*;
import java.util.List;
import org.bson.Document;
import org.junit.jupiter.api.Test;
/**
* @author Christoph Strobl
*/
class AggregationOperationUnitTests {
@Test // GH-4306
void getOperatorShouldFavorToPipelineStages() {
AggregationOperation op = new AggregationOperation() {
@Override
public Document toDocument(AggregationOperationContext context) {
return new Document();
}
@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
return List.of(new Document("$spring", "data"));
}
};
assertThat(op.getOperator()).isEqualTo("$spring");
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.aggregation;
import java.util.List;
import org.bson.Document;
import org.junit.jupiter.api.Test;
/**
* @author Christoph Strobl
*/
class AggregationPipelineUnitTests {
@Test // Gh-4306
void verifyMustNotFailIfOnlyPipelineStagesUsed() {
AggregationPipeline.of(new OverridesPipelineStagesAndThrowsErrorOnToDocument()).verify();
}
static class OverridesPipelineStagesAndThrowsErrorOnToDocument implements AggregationOperation {
@Override
public Document toDocument(AggregationOperationContext context) {
throw new IllegalStateException("oh no");
}
@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
return List.of(Aggregation.project("data").toDocument(context));
}
}
}

View File

@@ -43,7 +43,6 @@ import java.util.stream.Stream;
import org.assertj.core.data.Offset;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -61,6 +60,7 @@ import org.springframework.data.mongodb.core.TestEntities;
import org.springframework.data.mongodb.core.Venue;
import org.springframework.data.mongodb.core.aggregation.AggregationTests.CarDescriptor.Entry;
import org.springframework.data.mongodb.core.aggregation.BucketAutoOperation.Granularities;
import org.springframework.data.mongodb.core.aggregation.VariableOperators.Let;
import org.springframework.data.mongodb.core.aggregation.VariableOperators.Let.ExpressionVariable;
import org.springframework.data.mongodb.core.geo.GeoJsonPoint;
import org.springframework.data.mongodb.core.index.GeoSpatialIndexType;
@@ -90,6 +90,7 @@ import com.mongodb.client.MongoCollection;
* @author Maninder Singh
* @author Sergey Shcherbakov
* @author Minsu Kim
* @author Sangyong Choi
*/
@ExtendWith(MongoTemplateExtension.class)
public class AggregationTests {
@@ -499,7 +500,7 @@ public class AggregationTests {
/*
//complex mongodb aggregation framework example from
https://docs.mongodb.org/manual/tutorial/aggregation-examples/#largest-and-smallest-cities-by-state
db.zipcodes.aggregate(
{
$group: {
@@ -1518,8 +1519,47 @@ public class AggregationTests {
assertThat(firstItem).containsEntry("linkedPerson.[0].firstname", "u1");
}
@Test // GH-3322
@EnableIfMongoServerVersion(isGreaterThanEqual = "5.0")
void shouldLookupPeopleCorrectlyWithPipeline() {
createUsersWithReferencedPersons();
TypedAggregation<User> agg = newAggregation(User.class, //
lookup().from("person").localField("_id").foreignField("firstname").pipeline(match(where("firstname").is("u1"))).as("linkedPerson"), //
sort(ASC, "id"));
AggregationResults<Document> results = mongoTemplate.aggregate(agg, User.class, Document.class);
List<Document> mappedResults = results.getMappedResults();
Document firstItem = mappedResults.get(0);
assertThat(firstItem).containsEntry("_id", "u1");
assertThat(firstItem).containsEntry("linkedPerson.[0].firstname", "u1");
}
@Test // GH-3322
@EnableIfMongoServerVersion(isGreaterThanEqual = "5.0")
void shouldLookupPeopleCorrectlyWithPipelineAndLet() {
createUsersWithReferencedPersons();
TypedAggregation<User> agg = newAggregation(User.class, //
lookup().from("person").localField("_id").foreignField("firstname").let(Let.ExpressionVariable.newVariable("the_id").forField("_id")).pipeline(
match(ctx -> new Document("$expr", new Document("$eq", List.of("$$the_id", "u1"))))).as("linkedPerson"),
sort(ASC, "id"));
AggregationResults<Document> results = mongoTemplate.aggregate(agg, User.class, Document.class);
List<Document> mappedResults = results.getMappedResults();
Document firstItem = mappedResults.get(0);
assertThat(firstItem).containsEntry("_id", "u1");
assertThat(firstItem).containsEntry("linkedPerson.[0].firstname", "u1");
}
@Test // DATAMONGO-1326
void shouldGroupByAndLookupPeopleCorectly() {
void shouldGroupByAndLookupPeopleCorrectly() {
createUsersWithReferencedPersons();

View File

@@ -16,11 +16,16 @@
package org.springframework.data.mongodb.core.aggregation;
import static org.assertj.core.api.Assertions.*;
import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;
import static org.springframework.data.mongodb.core.aggregation.VariableOperators.Let.ExpressionVariable.*;
import static org.springframework.data.mongodb.test.util.Assertions.assertThat;
import java.util.List;
import org.bson.Document;
import org.junit.jupiter.api.Test;
import org.springframework.data.mongodb.core.DocumentTestUtils;
import org.springframework.data.mongodb.core.query.Criteria;
/**
* Unit tests for {@link LookupOperation}.
@@ -62,7 +67,7 @@ public class LookupOperationUnitTests {
Document lookupClause = extractDocumentFromLookupOperation(lookupOperation);
assertThat(lookupClause).containsEntry("from", "a") //
org.assertj.core.api.Assertions.assertThat(lookupClause).containsEntry("from", "a") //
.containsEntry("localField", "b") //
.containsEntry("foreignField", "c") //
.containsEntry("as", "d");
@@ -114,7 +119,7 @@ public class LookupOperationUnitTests {
Document lookupClause = extractDocumentFromLookupOperation(lookupOperation);
assertThat(lookupClause).containsEntry("from", "a") //
org.assertj.core.api.Assertions.assertThat(lookupClause).containsEntry("from", "a") //
.containsEntry("localField", "b") //
.containsEntry("foreignField", "c") //
.containsEntry("as", "d");
@@ -129,4 +134,86 @@ public class LookupOperationUnitTests {
assertThat(lookupOperation.getFields().exposesSingleFieldOnly()).isTrue();
assertThat(lookupOperation.getFields().getField("d")).isNotNull();
}
@Test // GH-3322
void buildsLookupWithLetAndPipeline() {
LookupOperation lookupOperation = LookupOperation.newLookup().from("warehouses")
.let(newVariable("order_item").forField("item"), newVariable("order_qty").forField("ordered"))
.pipeline(match(ctx -> new Document("$expr",
new Document("$and", List.of(Document.parse("{ $eq: [ \"$stock_item\", \"$$order_item\" ] }"),
Document.parse("{ $gte: [ \"$instock\", \"$$order_qty\" ] }"))))))
.as("stockdata");
assertThat(lookupOperation.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
{ $lookup: {
from: "warehouses",
let: { order_item: "$item", order_qty: "$ordered" },
pipeline: [
{ $match:
{ $expr:
{ $and:
[
{ $eq: [ "$stock_item", "$$order_item" ] },
{ $gte: [ "$instock", "$$order_qty" ] }
]
}
}
}
],
as: "stockdata"
}}
""");
}
@Test // GH-3322
void buildsLookupWithJustPipeline() {
LookupOperation lookupOperation = LookupOperation.newLookup().from("holidays") //
.pipeline( //
match(Criteria.where("year").is(2018)), //
project().andExclude("_id").and(ctx -> new Document("name", "$name").append("date", "$date")).as("date"), //
Aggregation.replaceRoot("date") //
).as("holidays");
assertThat(lookupOperation.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
{ $lookup:
{
from: "holidays",
pipeline: [
{ $match: { year: 2018 } },
{ $project: { _id: 0, date: { name: "$name", date: "$date" } } },
{ $replaceRoot: { newRoot: "$date" } }
],
as: "holidays"
}
}}
""");
}
@Test // GH-3322
void buildsLookupWithLocalAndForeignFieldAsWellAsLetAndPipeline() {
LookupOperation lookupOperation = Aggregation.lookup().from("restaurants") //
.localField("restaurant_name")
.foreignField("name")
.let(newVariable("orders_drink").forField("drink")) //
.pipeline(match(ctx -> new Document("$expr", new Document("$in", List.of("$$orders_drink", "$beverages")))))
.as("matches");
assertThat(lookupOperation.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
{ $lookup: {
from: "restaurants",
localField: "restaurant_name",
foreignField: "name",
let: { orders_drink: "$drink" },
pipeline: [{
$match: {
$expr: { $in: [ "$$orders_drink", "$beverages" ] }
}
}],
as: "matches"
}}
""");
}
}

View File

@@ -0,0 +1,68 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.aggregation;
import static org.springframework.data.mongodb.test.util.Assertions.*;
import java.util.List;
import org.bson.Document;
import org.junit.jupiter.api.Test;
/**
* @author Christoph Strobl
*/
class MultiOperationAggregationStageUnitTests {
@Test // GH-4306
void toDocumentRendersSingleOperation() {
MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"));
assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("{ $text: { $search: 'operating' } }");
}
@Test // GH-4306
void toDocumentRendersMultiOperation() {
MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"),
Document.parse("{ $sort: { score: { $meta: 'textScore' } } }"));
assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
{
$text: { $search: 'operating' },
$sort: { score: { $meta: 'textScore' } }
}
""");
}
@Test // GH-4306
void toDocumentCollectsDuplicateOperation() {
MultiOperationAggregationStage stage = (ctx) -> List.of(Document.parse("{ $text: { $search: 'operating' } }"),
Document.parse("{ $sort: { score: { $meta: 'textScore' } } }"), Document.parse("{ $sort: { posts: -1 } }"));
assertThat(stage.toDocument(Aggregation.DEFAULT_CONTEXT)).isEqualTo("""
{
$text: { $search: 'operating' },
$sort: [
{ score: { $meta: 'textScore' } },
{ posts: -1 }
]
}
""");
}
}

View File

@@ -21,9 +21,6 @@ import static org.springframework.data.mongodb.core.messaging.SubscriptionUtils.
import static org.springframework.data.mongodb.core.query.Criteria.*;
import static org.springframework.data.mongodb.core.query.Query.*;
import com.mongodb.client.model.ChangeStreamPreAndPostImagesOptions;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -42,8 +39,10 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junitpioneer.jupiter.RepeatFailedTest;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.CollectionOptions;
@@ -62,7 +61,7 @@ import org.springframework.data.mongodb.test.util.Template;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import org.junitpioneer.jupiter.RepeatFailedTest;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
/**
* Integration test for subscribing to a {@link com.mongodb.operation.ChangeStreamBatchCursor} inside the
@@ -702,7 +701,7 @@ class ChangeStreamTests {
}
@Test // GH-4187
@EnableIfMongoServerVersion(isLessThan = "6.0")
@Disabled("Flakey test failing occasionally due to timing issues")
void readsFullDocumentBeforeChangeWhenOptionDeclaredRequiredAndMongoVersionIsLessThan6() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();

View File

@@ -29,6 +29,10 @@ import org.springframework.data.geo.Metrics;
import org.springframework.data.geo.Point;
import org.springframework.data.mongodb.core.DocumentTestUtils;
import org.springframework.data.mongodb.core.geo.GeoJsonPoint;
import org.springframework.test.util.ReflectionTestUtils;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
/**
* Unit tests for {@link NearQuery}.
@@ -229,4 +233,58 @@ public class NearQueryUnitTests {
assertThat(query.toDocument()).containsEntry("maxDistance", 1000D).containsEntry("distanceMultiplier", 0.00062137D);
}
@Test // GH-4277
void fetchesReadPreferenceFromUnderlyingQueryObject() {
NearQuery nearQuery = NearQuery.near(new Point(0, 0))
.query(new Query().withReadPreference(ReadPreference.nearest()));
assertThat(nearQuery.getReadPreference()).isEqualTo(ReadPreference.nearest());
}
@Test // GH-4277
void fetchesReadConcernFromUnderlyingQueryObject() {
NearQuery nearQuery = NearQuery.near(new Point(0, 0)).query(new Query().withReadConcern(ReadConcern.SNAPSHOT));
assertThat(nearQuery.getReadConcern()).isEqualTo(ReadConcern.SNAPSHOT);
}
@Test // GH-4277
void usesReadPreferenceFromNearQueryIfUnderlyingQueryDoesNotDefineAny() {
NearQuery nearQuery = NearQuery.near(new Point(0, 0)).withReadPreference(ReadPreference.nearest())
.query(new Query());
assertThat(((Query) ReflectionTestUtils.getField(nearQuery, "query")).getReadPreference()).isNull();
assertThat(nearQuery.getReadPreference()).isEqualTo(ReadPreference.nearest());
}
@Test // GH-4277
void usesReadConcernFromNearQueryIfUnderlyingQueryDoesNotDefineAny() {
NearQuery nearQuery = NearQuery.near(new Point(0, 0)).withReadConcern(ReadConcern.SNAPSHOT).query(new Query());
assertThat(((Query) ReflectionTestUtils.getField(nearQuery, "query")).getReadConcern()).isNull();
assertThat(nearQuery.getReadConcern()).isEqualTo(ReadConcern.SNAPSHOT);
}
@Test // GH-4277
void readPreferenceFromUnderlyingQueryOverridesNearQueryOne() {
NearQuery nearQuery = NearQuery.near(new Point(0, 0)).withReadPreference(ReadPreference.nearest())
.query(new Query().withReadPreference(ReadPreference.primary()));
assertThat(nearQuery.getReadPreference()).isEqualTo(ReadPreference.primary());
}
@Test // GH-4277
void readConcernFromUnderlyingQueryOverridesNearQueryOne() {
NearQuery nearQuery = NearQuery.near(new Point(0, 0)).withReadConcern(ReadConcern.SNAPSHOT)
.query(new Query().withReadConcern(ReadConcern.MAJORITY));
assertThat(nearQuery.getReadConcern()).isEqualTo(ReadConcern.MAJORITY);
}
}

View File

@@ -342,7 +342,10 @@ class QueryTests {
source.limit(10);
source.setSortObject(new Document("_id", 1));
Query target = Query.of((Query) new ProxyFactory(source).getProxy());
ProxyFactory proxyFactory = new ProxyFactory(source);
proxyFactory.setInterfaces(new Class[0]);
Query target = Query.of((Query) proxyFactory.getProxy());
compareQueries(target, source);
}

View File

@@ -28,7 +28,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
@@ -120,7 +119,11 @@ class SimpleReactiveMongoRepositoryUnitTests {
@Test // DATAMONGO-1854
void shouldAddDefaultCollationToFindOneForExampleIfPresent() {
when(mongoOperations.find(any(), any(), any())).thenReturn(flux);
when(entityInformation.getCollectionName()).thenReturn("testdummy");
doReturn(flux).when(mongoOperations).find(any(Query.class), eq(TestDummy.class), eq("testdummy"));
when(flux.buffer(anyInt())).thenReturn(flux);
when(flux.map(any())).thenReturn(flux);
when(flux.next()).thenReturn(mono);
Collation collation = Collation.of("en_US");

View File

@@ -1,4 +1,4 @@
Spring Data MongoDB 4.0 GA (2022.0.0)
Spring Data MongoDB 4.1 M2 (2023.0.0)
Copyright (c) [2010-2019] Pivotal Software, Inc.
This product is licensed to you under the Apache License, Version 2.0 (the "License").
@@ -39,6 +39,8 @@ conditions of the subcomponent's license, as noted in the LICENSE file.