DATAMONGO-1637 - Polishing.

Move aggregation options conversion to AggregationOptions.getMongoAggregationOptions(). Allow cursor options to control cursor batch size. Add command logging to stream execution. Rearrange method order. Close cursor in tests. Change author name from user name to full name.

Original pull request: #447.
This commit is contained in:
Mark Paluch
2017-03-14 13:01:03 +01:00
parent 1a65828365
commit a84c4b064d
6 changed files with 445 additions and 226 deletions

View File

@@ -1,6 +1,6 @@
/*
* Copyright 2011-2016 the original author or authors.
*
* Copyright 2011-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -23,6 +23,7 @@ import org.bson.Document;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter;
@@ -38,7 +39,6 @@ import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.util.CloseableIterator;
import com.mongodb.Cursor;
import com.mongodb.DB;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.DeleteResult;
@@ -48,7 +48,7 @@ import com.mongodb.client.result.UpdateResult;
* Interface that specifies a basic set of MongoDB operations. Implemented by {@link MongoTemplate}. Not often used but
* a useful option for extensibility and testability (as it can be easily mocked, stubbed, or be the target of a JDK
* proxy).
*
*
* @author Thomas Risberg
* @author Mark Pollack
* @author Oliver Gierke
@@ -56,13 +56,13 @@ import com.mongodb.client.result.UpdateResult;
* @author Chuong Ngo
* @author Christoph Strobl
* @author Thomas Darimont
* @author maninder
* @author Maninder Singh
*/
public interface MongoOperations {
/**
* The collection name used for the specified class by this template.
*
*
* @param entityClass must not be {@literal null}.
* @return
*/
@@ -72,7 +72,7 @@ public interface MongoOperations {
* Execute the a MongoDB command expressed as a JSON string. This will call the method JSON.parse that is part of the
* MongoDB driver to convert the JSON string to a Document. Any errors that result from executing this command will be
* converted into Spring's DAO exception hierarchy.
*
*
* @param jsonCommand a MongoDB command expressed as a JSON string.
*/
Document executeCommand(String jsonCommand);
@@ -80,7 +80,7 @@ public interface MongoOperations {
/**
* Execute a MongoDB command. Any errors that result from executing this command will be converted into Spring's DAO
* exception hierarchy.
*
*
* @param command a MongoDB command
*/
Document executeCommand(Document command);
@@ -88,7 +88,7 @@ public interface MongoOperations {
/**
* Execute a MongoDB command. Any errors that result from executing this command will be converted into Spring's data
* access exception hierarchy.
*
*
* @param command a MongoDB command, must not be {@literal null}.
* @param readPreference read preferences to use, can be {@literal null}.
* @return
@@ -98,7 +98,7 @@ public interface MongoOperations {
/**
* Execute a MongoDB query and iterate over the query results on a per-document basis with a DocumentCallbackHandler.
*
*
* @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification
* @param collectionName name of the collection to retrieve the objects from
@@ -110,7 +110,7 @@ public interface MongoOperations {
* Executes a {@link DbCallback} translating any exceptions as necessary.
* <p/>
* Allows for returning a result object, that is a domain object or a collection of domain objects.
*
*
* @param <T> return type
* @param action callback object that specifies the MongoDB actions to perform on the passed in DB instance.
* @return a result object returned by the action or <tt>null</tt>
@@ -121,7 +121,7 @@ public interface MongoOperations {
* Executes the given {@link CollectionCallback} on the entity collection of the specified class.
* <p/>
* Allows for returning a result object, that is a domain object or a collection of domain objects.
*
*
* @param entityClass class that determines the collection to use
* @param <T> return type
* @param action callback object that specifies the MongoDB action
@@ -133,7 +133,7 @@ public interface MongoOperations {
* Executes the given {@link CollectionCallback} on the collection of the given name.
* <p/>
* Allows for returning a result object, that is a domain object or a collection of domain objects.
*
*
* @param <T> return type
* @param collectionName the name of the collection that specifies which DBCollection instance will be passed into
* @param action callback object that specifies the MongoDB action the callback action.
@@ -146,7 +146,7 @@ public interface MongoOperations {
* {@link Cursor}.
* <p>
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link Cursor} that needs to be closed.
*
*
* @param <T> element return type
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
@@ -160,7 +160,7 @@ public interface MongoOperations {
* by a Mongo DB {@link Cursor}.
* <p>
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link Cursor} that needs to be closed.
*
*
* @param <T> element return type
* @param query must not be {@literal null}.
* @param entityType must not be {@literal null}.
@@ -172,7 +172,7 @@ public interface MongoOperations {
/**
* Create an uncapped collection with a name based on the provided entity class.
*
*
* @param entityClass class that determines the collection to create
* @return the created collection
*/
@@ -180,7 +180,7 @@ public interface MongoOperations {
/**
* Create a collection with a name based on the provided entity class using the options.
*
*
* @param entityClass class that determines the collection to create
* @param collectionOptions options to use when creating the collection.
* @return the created collection
@@ -189,7 +189,7 @@ public interface MongoOperations {
/**
* Create an uncapped collection with the provided name.
*
*
* @param collectionName name of the collection
* @return the created collection
*/
@@ -197,7 +197,7 @@ public interface MongoOperations {
/**
* Create a collection with the provided name and options.
*
*
* @param collectionName name of the collection
* @param collectionOptions options to use when creating the collection.
* @return the created collection
@@ -206,7 +206,7 @@ public interface MongoOperations {
/**
* A set of collection names.
*
*
* @return list of collection names
*/
Set<String> getCollectionNames();
@@ -215,7 +215,7 @@ public interface MongoOperations {
* Get a collection by name, creating it if it doesn't exist.
* <p/>
* Translate any exceptions as necessary.
*
*
* @param collectionName name of the collection
* @return an existing collection or a newly created one.
*/
@@ -225,7 +225,7 @@ public interface MongoOperations {
* Check to see if a collection with a name indicated by the entity class exists.
* <p/>
* Translate any exceptions as necessary.
*
*
* @param entityClass class that determines the name of the collection
* @return true if a collection with the given name is found, false otherwise.
*/
@@ -235,7 +235,7 @@ public interface MongoOperations {
* Check to see if a collection with a given name exists.
* <p/>
* Translate any exceptions as necessary.
*
*
* @param collectionName name of the collection
* @return true if a collection with the given name is found, false otherwise.
*/
@@ -245,7 +245,7 @@ public interface MongoOperations {
* Drop the collection with the name indicated by the entity class.
* <p/>
* Translate any exceptions as necessary.
*
*
* @param entityClass class that determines the collection to drop/delete.
*/
<T> void dropCollection(Class<T> entityClass);
@@ -254,21 +254,21 @@ public interface MongoOperations {
* Drop the collection with the given name.
* <p/>
* Translate any exceptions as necessary.
*
*
* @param collectionName name of the collection to drop/delete.
*/
void dropCollection(String collectionName);
/**
* Returns the operations that can be performed on indexes
*
*
* @return index operations on the named collection
*/
IndexOperations indexOps(String collectionName);
/**
* Returns the operations that can be performed on indexes
*
*
* @return index operations on the named collection associated with the given entity class
*/
IndexOperations indexOps(Class<?> entityClass);
@@ -283,7 +283,7 @@ public interface MongoOperations {
/**
* Returns a new {@link BulkOperations} for the given collection.
*
*
* @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}.
* @param collectionName the name of the collection to work on, must not be {@literal null} or empty.
* @return {@link BulkOperations} on the named collection
@@ -292,7 +292,7 @@ public interface MongoOperations {
/**
* Returns a new {@link BulkOperations} for the given entity type.
*
*
* @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}.
* @param entityType the name of the entity class, must not be {@literal null}.
* @return {@link BulkOperations} on the named collection associated of the given entity class.
@@ -301,7 +301,7 @@ public interface MongoOperations {
/**
* Returns a new {@link BulkOperations} for the given entity type and collection name.
*
*
* @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}.
* @param entityClass the name of the entity class, must not be {@literal null}.
* @param collectionName the name of the collection to work on, must not be {@literal null} or empty.
@@ -317,7 +317,7 @@ public interface MongoOperations {
* <p/>
* If your collection does not contain a homogeneous collection of types, this operation will not be an efficient way
* to map objects since the test for class type is done in the client and not on the server.
*
*
* @param entityClass the parameterized type of the returned list
* @return the converted collection
*/
@@ -331,7 +331,7 @@ public interface MongoOperations {
* <p/>
* If your collection does not contain a homogeneous collection of types, this operation will not be an efficient way
* to map objects since the test for class type is done in the client and not on the server.
*
*
* @param entityClass the parameterized type of the returned list.
* @param collectionName name of the collection to retrieve the objects from
* @return the converted collection
@@ -341,7 +341,7 @@ public interface MongoOperations {
/**
* Execute a group operation over the entire collection. The group operation entity class should match the 'shape' of
* the returned object that takes int account the initial document structure as well as any finalize functions.
*
*
* @param criteria The criteria that restricts the row that are considered for grouping. If not specified all rows are
* considered.
* @param inputCollectionName the collection where the group operation will read from
@@ -356,7 +356,7 @@ public interface MongoOperations {
* Execute a group operation restricting the rows to those which match the provided Criteria. The group operation
* entity class should match the 'shape' of the returned object that takes int account the initial document structure
* as well as any finalize functions.
*
*
* @param criteria The criteria that restricts the row that are considered for grouping. If not specified all rows are
* considered.
* @param inputCollectionName the collection where the group operation will read from
@@ -370,7 +370,7 @@ public interface MongoOperations {
/**
* Execute an aggregation operation. The raw results will be mapped to the given entity class. The name of the
* inputCollection is derived from the inputType of the aggregation.
*
*
* @param aggregation The {@link TypedAggregation} specification holding the aggregation operations, must not be
* {@literal null}.
* @param collectionName The name of the input collection to use for the aggreation.
@@ -380,13 +380,10 @@ public interface MongoOperations {
*/
<O> AggregationResults<O> aggregate(TypedAggregation<?> aggregation, String collectionName, Class<O> outputType);
<O> CloseableIterator<O> aggregateStream(TypedAggregation<?> aggregation, String inputCollectionName,
Class<O> outputType);
/**
* Execute an aggregation operation. The raw results will be mapped to the given entity class. The name of the
* inputCollection is derived from the inputType of the aggregation.
*
*
* @param aggregation The {@link TypedAggregation} specification holding the aggregation operations, must not be
* {@literal null}.
* @param outputType The parameterized type of the returned list, must not be {@literal null}.
@@ -395,22 +392,9 @@ public interface MongoOperations {
*/
<O> AggregationResults<O> aggregate(TypedAggregation<?> aggregation, Class<O> outputType);
/**
* Execute an aggregation operation. The raw results will be mapped to the given entity class and are returned as
* stream. The name of the inputCollection is derived from the inputType of the aggregation.
*
* @param aggregation The {@link TypedAggregation} specification holding the aggregation operations, must not be
* {@literal null}.
* @param outputType The parameterized type of the returned list, must not be {@literal null}.
* @return The results of the aggregation operation.
* @since 1.11.0
*/
<O> CloseableIterator<O> aggregateStream(TypedAggregation<?> aggregation, Class<O> outputType);
/**
* Execute an aggregation operation. The raw results will be mapped to the given entity class.
*
*
* @param aggregation The {@link Aggregation} specification holding the aggregation operations, must not be
* {@literal null}.
* @param inputType the inputType where the aggregation operation will read from, must not be {@literal null} or
@@ -421,11 +405,9 @@ public interface MongoOperations {
*/
<O> AggregationResults<O> aggregate(Aggregation aggregation, Class<?> inputType, Class<O> outputType);
<O> CloseableIterator<O> aggregateStream(Aggregation aggregation, Class<?> inputType, Class<O> outputType);
/**
* Execute an aggregation operation. The raw results will be mapped to the given entity class.
*
*
* @param aggregation The {@link Aggregation} specification holding the aggregation operations, must not be
* {@literal null}.
* @param collectionName the collection where the aggregation operation will read from, must not be {@literal null} or
@@ -436,12 +418,84 @@ public interface MongoOperations {
*/
<O> AggregationResults<O> aggregate(Aggregation aggregation, String collectionName, Class<O> outputType);
<O> CloseableIterator<O> aggregateStream(Aggregation aggregation, String collectionName, Class<O> outputType);
/**
* Execute an aggregation operation backed by a Mongo DB {@link Cursor}.
* <p>
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link Cursor} that needs to be closed. The raw
* results will be mapped to the given entity class. The name of the inputCollection is derived from the inputType of
* the aggregation.
* <p>
* Aggregation streaming can't be used with {@link AggregationOptions#isExplain() aggregation explain}. Enabling
* explanation mode will throw an {@link IllegalArgumentException}.
*
* @param aggregation The {@link TypedAggregation} specification holding the aggregation operations, must not be
* {@literal null}.
* @param collectionName The name of the input collection to use for the aggreation.
* @param outputType The parameterized type of the returned list, must not be {@literal null}.
* @return The results of the aggregation operation.
* @since 2.0
*/
<O> CloseableIterator<O> aggregateStream(TypedAggregation<?> aggregation, String collectionName, Class<O> outputType);
/**
* Execute an aggregation operation backed by a Mongo DB {@link Cursor}.
* <p/>
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link Cursor} that needs to be closed. The raw
* results will be mapped to the given entity class and are returned as stream. The name of the inputCollection is
* derived from the inputType of the aggregation.
* <p/>
* Aggregation streaming can't be used with {@link AggregationOptions#isExplain() aggregation explain}. Enabling
* explanation mode will throw an {@link IllegalArgumentException}.
*
* @param aggregation The {@link TypedAggregation} specification holding the aggregation operations, must not be
* {@literal null}.
* @param outputType The parameterized type of the returned list, must not be {@literal null}.
* @return The results of the aggregation operation.
* @since 2.0
*/
<O> CloseableIterator<O> aggregateStream(TypedAggregation<?> aggregation, Class<O> outputType);
/**
* Execute an aggregation operation backed by a Mongo DB {@link Cursor}.
* <p/>
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link Cursor} that needs to be closed. The raw
* results will be mapped to the given entity class.
* <p/>
* Aggregation streaming can't be used with {@link AggregationOptions#isExplain() aggregation explain}. Enabling
* explanation mode will throw an {@link IllegalArgumentException}.
*
* @param aggregation The {@link Aggregation} specification holding the aggregation operations, must not be
* {@literal null}.
* @param inputType the inputType where the aggregation operation will read from, must not be {@literal null} or
* empty.
* @param outputType The parameterized type of the returned list, must not be {@literal null}.
* @return The results of the aggregation operation.
* @since 2.0
*/
<O> CloseableIterator<O> aggregateStream(Aggregation aggregation, Class<?> inputType, Class<O> outputType);
/**
* Execute an aggregation operation backed by a Mongo DB {@link Cursor}.
* <p/>
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link Cursor} that needs to be closed. The raw
* results will be mapped to the given entity class.
* <p/>
* Aggregation streaming can't be used with {@link AggregationOptions#isExplain() aggregation explain}. Enabling
* explanation mode will throw an {@link IllegalArgumentException}.
*
* @param aggregation The {@link Aggregation} specification holding the aggregation operations, must not be
* {@literal null}.
* @param collectionName the collection where the aggregation operation will read from, must not be {@literal null} or
* empty.
* @param outputType The parameterized type of the returned list, must not be {@literal null}.
* @return The results of the aggregation operation.
* @since 2.0
*/
<O> CloseableIterator<O> aggregateStream(Aggregation aggregation, String collectionName, Class<O> outputType);
/**
* Execute a map-reduce operation. The map-reduce operation will be formed with an output type of INLINE
*
*
* @param inputCollectionName the collection where the map-reduce will read from
* @param mapFunction The JavaScript map function
* @param reduceFunction The JavaScript reduce function
@@ -454,7 +508,7 @@ public interface MongoOperations {
/**
* Execute a map-reduce operation that takes additional map-reduce options.
*
*
* @param inputCollectionName the collection where the map-reduce will read from
* @param mapFunction The JavaScript map function
* @param reduceFunction The JavaScript reduce function
@@ -468,7 +522,7 @@ public interface MongoOperations {
/**
* Execute a map-reduce operation that takes a query. The map-reduce operation will be formed with an output type of
* INLINE
*
*
* @param query The query to use to select the data for the map phase
* @param inputCollectionName the collection where the map-reduce will read from
* @param mapFunction The JavaScript map function
@@ -482,7 +536,7 @@ public interface MongoOperations {
/**
* Execute a map-reduce operation that takes a query and additional map-reduce options
*
*
* @param query The query to use to select the data for the map phase
* @param inputCollectionName the collection where the map-reduce will read from
* @param mapFunction The JavaScript map function
@@ -499,7 +553,7 @@ public interface MongoOperations {
* information to determine the collection the query is ran against. Note, that MongoDB limits the number of results
* by default. Make sure to add an explicit limit to the {@link NearQuery} if you expect a particular number of
* results.
*
*
* @param near must not be {@literal null}.
* @param entityClass must not be {@literal null}.
* @return
@@ -510,7 +564,7 @@ public interface MongoOperations {
* Returns {@link GeoResults} for all entities matching the given {@link NearQuery}. Note, that MongoDB limits the
* number of results by default. Make sure to add an explicit limit to the {@link NearQuery} if you expect a
* particular number of results.
*
*
* @param near must not be {@literal null}.
* @param entityClass must not be {@literal null}.
* @param collectionName the collection to trigger the query against. If no collection name is given the entity class
@@ -528,7 +582,7 @@ public interface MongoOperations {
* <p/>
* The query is specified as a {@link Query} which can be created either using the {@link BasicQuery} or the more
* feature rich {@link Query}.
*
*
* @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification
* @param entityClass the parameterized type of the returned list.
@@ -545,7 +599,7 @@ public interface MongoOperations {
* <p/>
* The query is specified as a {@link Query} which can be created either using the {@link BasicQuery} or the more
* feature rich {@link Query}.
*
*
* @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification
* @param entityClass the parameterized type of the returned list.
@@ -556,7 +610,7 @@ public interface MongoOperations {
/**
* Determine result of given {@link Query} contains at least one element.
*
*
* @param query the {@link Query} class that specifies the criteria used to find a record.
* @param collectionName name of the collection to check for objects.
* @return
@@ -565,7 +619,7 @@ public interface MongoOperations {
/**
* Determine result of given {@link Query} contains at least one element.
*
*
* @param query the {@link Query} class that specifies the criteria used to find a record.
* @param entityClass the parameterized type.
* @return
@@ -574,7 +628,7 @@ public interface MongoOperations {
/**
* Determine result of given {@link Query} contains at least one element.
*
*
* @param query the {@link Query} class that specifies the criteria used to find a record.
* @param entityClass the parameterized type.
* @param collectionName name of the collection to check for objects.
@@ -590,7 +644,7 @@ public interface MongoOperations {
* <p/>
* The query is specified as a {@link Query} which can be created either using the {@link BasicQuery} or the more
* feature rich {@link Query}.
*
*
* @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification
* @param entityClass the parameterized type of the returned list.
@@ -606,7 +660,7 @@ public interface MongoOperations {
* <p/>
* The query is specified as a {@link Query} which can be created either using the {@link BasicQuery} or the more
* feature rich {@link Query}.
*
*
* @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification
* @param entityClass the parameterized type of the returned list.
@@ -618,7 +672,7 @@ public interface MongoOperations {
/**
* Returns a document with the given id mapped onto the given class. The collection the query is ran against will be
* derived from the given target class as well.
*
*
* @param <T>
* @param id the id of the document to return.
* @param entityClass the type the document shall be converted into.
@@ -628,7 +682,7 @@ public interface MongoOperations {
/**
* Returns the document with the given id from the given collection mapped onto the given target class.
*
*
* @param id the id of the document to return
* @param entityClass the type to convert the document to
* @param collectionName the collection to query for the document
@@ -638,9 +692,9 @@ public interface MongoOperations {
<T> T findById(Object id, Class<T> entityClass, String collectionName);
/**
* Triggers <a href="http://docs.mongodb.org/manual/reference/method/db.collection.findAndModify/">findAndModify
* <a/> to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}.
*
* Triggers <a href="http://docs.mongodb.org/manual/reference/method/db.collection.findAndModify/">findAndModify <a/>
* to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a record and also an optional
* fields specification.
* @param update the {@link Update} to apply on matching documents.
@@ -650,9 +704,9 @@ public interface MongoOperations {
<T> T findAndModify(Query query, Update update, Class<T> entityClass);
/**
* Triggers <a href="http://docs.mongodb.org/manual/reference/method/db.collection.findAndModify/">findAndModify
* <a/> to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}.
*
* Triggers <a href="http://docs.mongodb.org/manual/reference/method/db.collection.findAndModify/">findAndModify <a/>
* to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a record and also an optional
* fields specification.
* @param update the {@link Update} to apply on matching documents.
@@ -663,10 +717,10 @@ public interface MongoOperations {
<T> T findAndModify(Query query, Update update, Class<T> entityClass, String collectionName);
/**
* Triggers <a href="http://docs.mongodb.org/manual/reference/method/db.collection.findAndModify/">findAndModify
* <a/> to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking
* Triggers <a href="http://docs.mongodb.org/manual/reference/method/db.collection.findAndModify/">findAndModify <a/>
* to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking
* {@link FindAndModifyOptions} into account.
*
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a record and also an optional
* fields specification.
* @param update the {@link Update} to apply on matching documents.
@@ -677,10 +731,10 @@ public interface MongoOperations {
<T> T findAndModify(Query query, Update update, FindAndModifyOptions options, Class<T> entityClass);
/**
* Triggers <a href="http://docs.mongodb.org/manual/reference/method/db.collection.findAndModify/">findAndModify
* <a/> to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking
* Triggers <a href="http://docs.mongodb.org/manual/reference/method/db.collection.findAndModify/">findAndModify <a/>
* to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking
* {@link FindAndModifyOptions} into account.
*
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a record and also an optional
* fields specification.
* @param update the {@link Update} to apply on matching documents.
@@ -690,7 +744,7 @@ public interface MongoOperations {
* @return
*/
<T> T findAndModify(Query query, Update update, FindAndModifyOptions options, Class<T> entityClass,
String collectionName);
String collectionName);
/**
* Map the results of an ad-hoc query on the collection for the entity type to a single instance of an object of the
@@ -701,7 +755,7 @@ public interface MongoOperations {
* <p/>
* The query is specified as a {@link Query} which can be created either using the {@link BasicQuery} or the more
* feature rich {@link Query}.
*
*
* @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification
* @param entityClass the parameterized type of the returned list.
@@ -718,7 +772,7 @@ public interface MongoOperations {
* <p/>
* The query is specified as a {@link Query} which can be created either using the {@link BasicQuery} or the more
* feature rich {@link Query}.
*
*
* @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification
* @param entityClass the parameterized type of the returned list.
@@ -729,7 +783,7 @@ public interface MongoOperations {
/**
* Returns the number of documents for the given {@link Query} by querying the collection of the given entity class.
*
*
* @param query
* @param entityClass must not be {@literal null}.
* @return
@@ -740,7 +794,7 @@ public interface MongoOperations {
* Returns the number of documents for the given {@link Query} querying the given collection. The given {@link Query}
* must solely consist of document field references as we lack type information to map potential property references
* onto document fields. TO make sure the query gets mapped, use {@link #count(Query, Class, String)}.
*
*
* @param query
* @param collectionName must not be {@literal null} or empty.
* @return
@@ -751,7 +805,7 @@ public interface MongoOperations {
/**
* Returns the number of documents for the given {@link Query} by querying the given collection using the given entity
* class to map the given {@link Query}.
*
*
* @param query
* @param entityClass must not be {@literal null}.
* @param collectionName must not be {@literal null} or empty.
@@ -772,7 +826,7 @@ public interface MongoOperations {
* <p/>
* <p/>
* Insert is used to initially store the object into the database. To update an existing object use the save method.
*
*
* @param objectToSave the object to store in the collection.
*/
void insert(Object objectToSave);
@@ -784,7 +838,7 @@ public interface MongoOperations {
* configured otherwise, an instance of MappingMongoConverter will be used.
* <p/>
* Insert is used to initially store the object into the database. To update an existing object use the save method.
*
*
* @param objectToSave the object to store in the collection
* @param collectionName name of the collection to store the object in
*/
@@ -792,7 +846,7 @@ public interface MongoOperations {
/**
* Insert a Collection of objects into a collection in a single batch write to the database.
*
*
* @param batchToSave the list of objects to save.
* @param entityClass class that determines the collection to use
*/
@@ -800,7 +854,7 @@ public interface MongoOperations {
/**
* Insert a list of objects into the specified collection in a single batch write to the database.
*
*
* @param batchToSave the list of objects to save.
* @param collectionName name of the collection to store the object in
*/
@@ -809,7 +863,7 @@ public interface MongoOperations {
/**
* Insert a mixed Collection of objects into a database collection determining the collection name to use based on the
* class.
*
*
* @param collectionToSave the list of objects to save.
*/
void insertAll(Collection<? extends Object> objectsToSave);
@@ -826,7 +880,7 @@ public interface MongoOperations {
* property type will be handled by Spring's BeanWrapper class that leverages Type Conversion API. See
* <a href="http://docs.spring.io/spring/docs/current/spring-framework-reference/html/validation.html#core-convert" >
* Spring's Type Conversion"</a> for more details.
*
*
* @param objectToSave the object to store in the collection
*/
void save(Object objectToSave);
@@ -843,7 +897,7 @@ public interface MongoOperations {
* property type will be handled by Spring's BeanWrapper class that leverages Type Cobnversion API. See <a
* http://docs.spring.io/spring/docs/current/spring-framework-reference/html/validation.html#core-convert">Spring's
* Type Conversion"</a> for more details.
*
*
* @param objectToSave the object to store in the collection
* @param collectionName name of the collection to store the object in
*/
@@ -852,7 +906,7 @@ public interface MongoOperations {
/**
* Performs an upsert. If no document is found that matches the query, a new document is created and inserted by
* combining the query document and the update document.
*
*
* @param query the query document that specifies the criteria used to select a record to be upserted
* @param update the update document that contains the updated object or $ operators to manipulate the existing object
* @param entityClass class that determines the collection to use
@@ -863,7 +917,7 @@ public interface MongoOperations {
/**
* Performs an upsert. If no document is found that matches the query, a new document is created and inserted by
* combining the query document and the update document.
*
*
* @param query the query document that specifies the criteria used to select a record to be updated
* @param update the update document that contains the updated object or $ operators to manipulate the existing
* object.
@@ -875,7 +929,7 @@ public interface MongoOperations {
/**
* Performs an upsert. If no document is found that matches the query, a new document is created and inserted by
* combining the query document and the update document.
*
*
* @param query the query document that specifies the criteria used to select a record to be upserted
* @param update the update document that contains the updated object or $ operators to manipulate the existing object
* @param entityClass class of the pojo to be operated on
@@ -887,7 +941,7 @@ public interface MongoOperations {
/**
* Updates the first object that is found in the collection of the entity class that matches the query document with
* the provided update document.
*
*
* @param query the query document that specifies the criteria used to select a record to be updated
* @param update the update document that contains the updated object or $ operators to manipulate the existing
* object.
@@ -899,7 +953,7 @@ public interface MongoOperations {
/**
* Updates the first object that is found in the specified collection that matches the query document criteria with
* the provided updated document.
*
*
* @param query the query document that specifies the criteria used to select a record to be updated
* @param update the update document that contains the updated object or $ operators to manipulate the existing
* object.
@@ -911,7 +965,7 @@ public interface MongoOperations {
/**
* Updates the first object that is found in the specified collection that matches the query document criteria with
* the provided updated document.
*
*
* @param query the query document that specifies the criteria used to select a record to be updated
* @param update the update document that contains the updated object or $ operators to manipulate the existing
* object.
@@ -924,7 +978,7 @@ public interface MongoOperations {
/**
* Updates all objects that are found in the collection for the entity class that matches the query document criteria
* with the provided updated document.
*
*
* @param query the query document that specifies the criteria used to select a record to be updated
* @param update the update document that contains the updated object or $ operators to manipulate the existing
* object.
@@ -936,7 +990,7 @@ public interface MongoOperations {
/**
* Updates all objects that are found in the specified collection that matches the query document criteria with the
* provided updated document.
*
*
* @param query the query document that specifies the criteria used to select a record to be updated
* @param update the update document that contains the updated object or $ operators to manipulate the existing
* object.
@@ -948,7 +1002,7 @@ public interface MongoOperations {
/**
* Updates all objects that are found in the collection for the entity class that matches the query document criteria
* with the provided updated document.
*
*
* @param query the query document that specifies the criteria used to select a record to be updated
* @param update the update document that contains the updated object or $ operators to manipulate the existing
* object.
@@ -960,14 +1014,14 @@ public interface MongoOperations {
/**
* Remove the given object from the collection by id.
*
*
* @param object
*/
DeleteResult remove(Object object);
/**
* Removes the given object from the given collection.
*
*
* @param object
* @param collection must not be {@literal null} or empty.
*/
@@ -976,7 +1030,7 @@ public interface MongoOperations {
/**
* Remove all documents that match the provided query document criteria from the the collection used to store the
* entityClass. The Class parameter is also used to help convert the Id of the object if it is present in the query.
*
*
* @param query
* @param entityClass
*/
@@ -985,7 +1039,7 @@ public interface MongoOperations {
/**
* Remove all documents that match the provided query document criteria from the the collection used to store the
* entityClass. The Class parameter is also used to help convert the Id of the object if it is present in the query.
*
*
* @param query
* @param entityClass
* @param collectionName
@@ -995,7 +1049,7 @@ public interface MongoOperations {
/**
* Remove all documents from the specified collection that match the provided query document criteria. There is no
* conversion/mapping done for any criteria using the id field.
*
*
* @param query the query document that specifies the criteria used to remove a record
* @param collectionName name of the collection where the objects will removed
*/
@@ -1003,7 +1057,7 @@ public interface MongoOperations {
/**
* Returns and removes all documents form the specified collection that match the provided query.
*
*
* @param query
* @param collectionName
* @return
@@ -1013,7 +1067,7 @@ public interface MongoOperations {
/**
* Returns and removes all documents matching the given query form the collection used to store the entityClass.
*
*
* @param query
* @param entityClass
* @return
@@ -1025,7 +1079,7 @@ public interface MongoOperations {
* Returns and removes all documents that match the provided query document criteria from the the collection used to
* store the entityClass. The Class parameter is also used to help convert the Id of the object if it is present in
* the query.
*
*
* @param query
* @param entityClass
* @param collectionName
@@ -1036,7 +1090,7 @@ public interface MongoOperations {
/**
* Returns the underlying {@link MongoConverter}.
*
*
* @return
*/
MongoConverter getConverter();

View File

@@ -15,26 +15,14 @@
*/
package org.springframework.data.mongodb.core;
import static org.springframework.data.mongodb.core.aggregation.AggregationOptions.*;
import static org.springframework.data.mongodb.core.query.Criteria.*;
import static org.springframework.data.mongodb.core.query.SerializationUtils.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.mongodb.*;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
@@ -69,6 +57,7 @@ import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.Fields;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
@@ -112,6 +101,16 @@ import org.springframework.util.ObjectUtils;
import org.springframework.util.ResourceUtils;
import org.springframework.util.StringUtils;
import com.mongodb.CommandResult;
import com.mongodb.Cursor;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.Mongo;
import com.mongodb.MongoException;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.WriteResult;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MapReduceIterable;
import com.mongodb.client.MongoCollection;
@@ -126,7 +125,6 @@ import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.util.JSONParseException;
import com.mongodb.AggregationOptions;
/**
* Primary implementation of {@link MongoOperations}.
@@ -146,7 +144,7 @@ import com.mongodb.AggregationOptions;
* @author Niko Schmuck
* @author Mark Paluch
* @author Laszlo Csontos
* @author maninder
* @author Maninder Singh
*/
@SuppressWarnings("deprecation")
public class MongoTemplate implements MongoOperations, ApplicationContextAware, IndexOperationsProvider {
@@ -365,8 +363,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
FindIterable<Document> cursor = collection.find(mappedQuery).projection(mappedFields);
QueryCursorPreparer cursorPreparer = new QueryCursorPreparer(query, entityType);
ReadDocumentCallback<T> readCallback = new ReadDocumentCallback<T>(mongoConverter, entityType,
collectionName);
ReadDocumentCallback<T> readCallback = new ReadDocumentCallback<T>(mongoConverter, entityType, collectionName);
return new CloseableIterableCursorAdapter<T>(cursorPreparer.prepare(cursor), exceptionTranslator, readCallback);
}
@@ -1527,16 +1524,17 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
return new GroupByResults<T>(mappedResults, commandResult);
}
/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.MongoOperations#aggregate(org.springframework.data.mongodb.core.aggregation.TypedAggregation, java.lang.Class)
*/
@Override
public <O> AggregationResults<O> aggregate(TypedAggregation<?> aggregation, Class<O> outputType) {
return aggregate(aggregation, determineCollectionName(aggregation.getInputType()), outputType);
}
@Override
public <O> CloseableIterator<O> aggregateStream(TypedAggregation<?> aggregation, Class<O> outputType) {
return aggregateStream(aggregation, determineCollectionName(aggregation.getInputType()), outputType);
}
/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.MongoOperations#aggregate(org.springframework.data.mongodb.core.aggregation.TypedAggregation, java.lang.String, java.lang.Class)
*/
@Override
public <O> AggregationResults<O> aggregate(TypedAggregation<?> aggregation, String inputCollectionName,
Class<O> outputType) {
@@ -1548,6 +1546,27 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
return aggregate(aggregation, inputCollectionName, outputType, context);
}
/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.MongoOperations#aggregate(org.springframework.data.mongodb.core.aggregation.Aggregation, java.lang.Class, java.lang.Class)
*/
@Override
public <O> AggregationResults<O> aggregate(Aggregation aggregation, Class<?> inputType, Class<O> outputType) {
return aggregate(aggregation, determineCollectionName(inputType), outputType,
new TypeBasedAggregationOperationContext(inputType, mappingContext, queryMapper));
}
/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.MongoOperations#aggregate(org.springframework.data.mongodb.core.aggregation.Aggregation, java.lang.String, java.lang.Class)
*/
@Override
public <O> AggregationResults<O> aggregate(Aggregation aggregation, String collectionName, Class<O> outputType) {
return aggregate(aggregation, collectionName, outputType, null);
}
/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.MongoOperations#aggregateStream(org.springframework.data.mongodb.core.aggregation.TypedAggregation, java.lang.String, java.lang.Class)
*/
@Override
public <O> CloseableIterator<O> aggregateStream(TypedAggregation<?> aggregation, String inputCollectionName,
Class<O> outputType) {
@@ -1559,13 +1578,17 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
return aggregateStream(aggregation, inputCollectionName, outputType, context);
}
/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.MongoOperations#aggregateStream(org.springframework.data.mongodb.core.aggregation.TypedAggregation, java.lang.Class)
*/
@Override
public <O> AggregationResults<O> aggregate(Aggregation aggregation, Class<?> inputType, Class<O> outputType) {
return aggregate(aggregation, determineCollectionName(inputType), outputType,
new TypeBasedAggregationOperationContext(inputType, mappingContext, queryMapper));
public <O> CloseableIterator<O> aggregateStream(TypedAggregation<?> aggregation, Class<O> outputType) {
return aggregateStream(aggregation, determineCollectionName(aggregation.getInputType()), outputType);
}
/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.MongoOperations#aggregateStream(org.springframework.data.mongodb.core.aggregation.Aggregation, java.lang.Class, java.lang.Class)
*/
@Override
public <O> CloseableIterator<O> aggregateStream(Aggregation aggregation, Class<?> inputType, Class<O> outputType) {
@@ -1573,11 +1596,9 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
new TypeBasedAggregationOperationContext(inputType, mappingContext, queryMapper));
}
@Override
public <O> AggregationResults<O> aggregate(Aggregation aggregation, String collectionName, Class<O> outputType) {
return aggregate(aggregation, collectionName, outputType, null);
}
/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.MongoOperations#aggregateStream(org.springframework.data.mongodb.core.aggregation.Aggregation, java.lang.String, java.lang.Class)
*/
@Override
public <O> CloseableIterator<O> aggregateStream(Aggregation aggregation, String collectionName, Class<O> outputType) {
return aggregateStream(aggregation, collectionName, outputType, null);
@@ -1677,8 +1698,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
return mappedResults;
}
protected <O> CloseableIterator<O> aggregateStream(final Aggregation aggregation, final String collectionName,
final Class<O> outputType, AggregationOperationContext context) {
protected <O> CloseableIterator<O> aggregateStream(Aggregation aggregation, String collectionName,
Class<O> outputType, AggregationOperationContext context) {
Assert.hasText(collectionName, "Collection name must not be null or empty!");
Assert.notNull(aggregation, "Aggregation pipeline must not be null!");
@@ -1686,35 +1707,53 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
AggregationOperationContext rootContext = context == null ? Aggregation.DEFAULT_CONTEXT : context;
final DBObject command = aggregation.toDbObject(collectionName, rootContext);
Document command = aggregation.toDocument(collectionName, rootContext);
Assert.isNull(command.get(CURSOR), "Custom options not allowed while streaming");
Assert.isNull(command.get(EXPLAIN), "Explain option can't be used while streaming");
assertNotExplain(command);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Streaming aggregation: {}", serializeToJsonSafely(command));
}
ReadDocumentCallback<O> readCallback = new ReadDocumentCallback<O>(mongoConverter, outputType, collectionName);
return execute(collectionName, new CollectionCallback<CloseableIterator<O>>() {
@Override
public CloseableIterator<O> doInCollection(DBCollection collection) throws MongoException, DataAccessException {
public CloseableIterator<O> doInCollection(MongoCollection<Document> collection)
throws MongoException, DataAccessException {
List<DBObject> pipeline = (List<DBObject>) command.get("pipeline");
Cursor cursor = collection.aggregate(pipeline, getNativeAggregationOptionsFromCommand(command));
List<Document> pipeline = (List<Document>) command.get("pipeline");
ReadDbObjectCallback<O> readCallback = new ReadDbObjectCallback<O>(mongoConverter, outputType, collectionName);
AggregationOptions options = AggregationOptions.fromDocument(command);
return new CloseableIterableCursorAdapter<O>(cursor, exceptionTranslator, readCallback);
}
AggregateIterable<Document> cursor = collection.aggregate(pipeline).allowDiskUse(options.isAllowDiskUse())
.useCursor(true);
private AggregationOptions getNativeAggregationOptionsFromCommand(DBObject command) {
AggregationOptions.Builder builder = AggregationOptions.builder();
Object allowDiskUse = command.get(ALLOW_DISK_USE);
if (allowDiskUse != null && String.valueOf(allowDiskUse).equals("true")) {
builder.allowDiskUse(true);
Integer cursorBatchSize = options.getCursorBatchSize();
if (cursorBatchSize != null) {
cursor.batchSize(cursorBatchSize);
}
return builder.build();
return new CloseableIterableCursorAdapter<O>(cursor.iterator(), exceptionTranslator, readCallback);
}
});
}
/**
* Assert that the {@link Document} does not enable Aggregation explain mode.
*
* @param command the command {@link Document}.
*/
private void assertNotExplain(Document command) {
Boolean explain = command.get("explain", Boolean.class);
if (explain != null && explain) {
throw new IllegalArgumentException("Can't use explain option with streaming!");
}
}
protected String replaceWithResourceIfNecessary(String function) {
String func = function;
@@ -2008,7 +2047,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
* @return
*/
private <T> T executeFindOneInternal(CollectionCallback<Document> collectionCallback,
DocumentCallback<T> objectCallback, String collectionName) {
DocumentCallback<T> objectCallback, String collectionName) {
try {
T result = objectCallback
@@ -2038,7 +2077,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
* @return
*/
private <T> List<T> executeFindMultiInternal(CollectionCallback<FindIterable<Document>> collectionCallback,
CursorPreparer preparer, DocumentCallback<T> objectCallback, String collectionName) {
CursorPreparer preparer, DocumentCallback<T> objectCallback, String collectionName) {
try {
@@ -2242,11 +2281,10 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
* Returns all identifiers for the given documents. Will augment the given identifiers and fill in only the ones that
* are {@literal null} currently. This would've been better solved in {@link #insertDBObjectList(String, List)}
* directly but would require a signature change of that method.
*
*
* @param ids
* @param documents
* @return
* TODO: Remove for 2.0 and change method signature of {@link #insertDBObjectList(String, List)}.
* @return TODO: Remove for 2.0 and change method signature of {@link #insertDBObjectList(String, List)}.
*/
private static List<Object> consolidateIdentifiers(List<ObjectId> ids, List<Document> documents) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 the original author or authors.
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,21 +16,26 @@
package org.springframework.data.mongodb.core.aggregation;
import org.bson.Document;
import org.springframework.util.Assert;
import com.mongodb.DBObject;
/**
* Holds a set of configurable aggregation options that can be used within an aggregation pipeline. A list of support
* aggregation options can be found in the MongoDB reference documentation
* https://docs.mongodb.org/manual/reference/command/aggregate/#aggregate
*
*
* @author Thomas Darimont
* @author Oliver Gierke
* @author Christoph Strobl
* @author Mark Paluch
* @see Aggregation#withOptions(AggregationOptions)
* @see TypedAggregation#withOptions(AggregationOptions)
* @since 1.6
*/
public class AggregationOptions {
private static final String BATCH_SIZE = "batchSize";
private static final String CURSOR = "cursor";
private static final String EXPLAIN = "explain";
private static final String ALLOW_DISK_USE = "allowDiskUse";
@@ -41,7 +46,7 @@ public class AggregationOptions {
/**
* Creates a new {@link AggregationOptions}.
*
*
* @param allowDiskUse whether to off-load intensive sort-operations to disk.
* @param explain whether to get the execution plan for the aggregation instead of the actual results.
* @param cursor can be {@literal null}, used to pass additional options to the aggregation.
@@ -53,10 +58,52 @@ public class AggregationOptions {
this.cursor = cursor;
}
/**
* Creates a new {@link AggregationOptions}.
*
* @param allowDiskUse whether to off-load intensive sort-operations to disk.
* @param explain whether to get the execution plan for the aggregation instead of the actual results.
* @param cursorBatchSize initial cursor batch size.
* @since 2.0
*/
public AggregationOptions(boolean allowDiskUse, boolean explain, int cursorBatchSize) {
this(allowDiskUse, explain, createCursor(cursorBatchSize));
}
/**
* Creates new {@link AggregationOptions} given {@link DBObject} containing aggregation options.
*
* @param document must not be {@literal null}.
* @return the {@link AggregationOptions}.
* @since 2.0
*/
public static AggregationOptions fromDocument(Document document) {
Assert.notNull(document, "Document must not be null!");
boolean allowDiskUse = false;
boolean explain = false;
Document cursor = null;
if (document.containsKey(ALLOW_DISK_USE)) {
allowDiskUse = document.get(ALLOW_DISK_USE, Boolean.class);
}
if (document.containsKey(EXPLAIN)) {
explain = (Boolean) document.get(EXPLAIN);
}
if (document.containsKey(CURSOR)) {
cursor = document.get(CURSOR, Document.class);
}
return new AggregationOptions(allowDiskUse, explain, cursor);
}
/**
* Enables writing to temporary files. When set to true, aggregation stages can write data to the _tmp subdirectory in
* the dbPath directory.
*
*
* @return
*/
public boolean isAllowDiskUse() {
@@ -65,16 +112,31 @@ public class AggregationOptions {
/**
* Specifies to return the information on the processing of the pipeline.
*
*
* @return
*/
public boolean isExplain() {
return explain;
}
/**
* The initial cursor batch size, if available, otherwise {@literal null}.
*
* @return the batch size or {@literal null}.
* @since 2.0
*/
public Integer getCursorBatchSize() {
if (cursor != null && cursor.containsKey("batchSize")) {
return cursor.get("batchSize", Integer.class);
}
return null;
}
/**
* Specify a document that contains options that control the creation of the cursor object.
*
*
* @return
*/
public Document getCursor() {
@@ -84,7 +146,7 @@ public class AggregationOptions {
/**
* Returns a new potentially adjusted copy for the given {@code aggregationCommandObject} with the configuration
* applied.
*
*
* @param command the aggregation command.
* @return
*/
@@ -101,7 +163,7 @@ public class AggregationOptions {
}
if (cursor != null && !result.containsKey(CURSOR)) {
result.put("cursor", cursor);
result.put(CURSOR, cursor);
}
return result;
@@ -109,7 +171,7 @@ public class AggregationOptions {
/**
* Returns a {@link Document} representation of this {@link AggregationOptions}.
*
*
* @return
*/
public Document toDocument() {
@@ -130,10 +192,15 @@ public class AggregationOptions {
return toDocument().toJson();
}
static Document createCursor(int cursorBatchSize) {
return new Document("batchSize", cursorBatchSize);
}
/**
* A Builder for {@link AggregationOptions}.
*
*
* @author Thomas Darimont
* @author Mark Paluch
*/
public static class Builder {
@@ -143,7 +210,7 @@ public class AggregationOptions {
/**
* Defines whether to off-load intensive sort-operations to disk.
*
*
* @param allowDiskUse
* @return
*/
@@ -155,7 +222,7 @@ public class AggregationOptions {
/**
* Defines whether to get the execution plan for the aggregation instead of the actual results.
*
*
* @param explain
* @return
*/
@@ -167,7 +234,7 @@ public class AggregationOptions {
/**
* Additional options to the aggregation.
*
*
* @param cursor
* @return
*/
@@ -177,9 +244,22 @@ public class AggregationOptions {
return this;
}
/**
* Define the initial cursor batch size.
*
* @param batchSize
* @return
* @since 2.0
*/
public Builder cursorBatchSize(int batchSize) {
this.cursor = createCursor(batchSize);
return this;
}
/**
* Returns a new {@link AggregationOptions} instance with the given configuration.
*
*
* @return
*/
public AggregationOptions build() {

View File

@@ -19,6 +19,8 @@ import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import org.bson.Document;
import org.junit.Before;
import org.junit.Test;
@@ -27,6 +29,7 @@ import org.junit.Test;
* Unit tests for {@link AggregationOptions}.
*
* @author Thomas Darimont
* @author Mark Paluch
* @since 1.6
*/
public class AggregationOptionsTests {
@@ -35,9 +38,10 @@ public class AggregationOptionsTests {
@Before
public void setup() {
aggregationOptions = newAggregationOptions().explain(true).cursor(new Document("foo", 1)).allowDiskUse(true)
aggregationOptions = newAggregationOptions().explain(true) //
.cursorBatchSize(1) //
.allowDiskUse(true) //
.build();
}
@Test // DATAMONGO-960
@@ -45,12 +49,28 @@ public class AggregationOptionsTests {
assertThat(aggregationOptions.isAllowDiskUse(), is(true));
assertThat(aggregationOptions.isExplain(), is(true));
assertThat(aggregationOptions.getCursor(), is(new Document("foo", 1)));
assertThat(aggregationOptions.getCursor(), is(new Document("batchSize", 1)));
}
@Test // DATAMONGO-1637
public void shouldInitializeFromDocument() {
Document document = new Document();
document.put("cursor", new Document("batchSize", 1));
document.put("explain", true);
document.put("allowDiskUse", true);
aggregationOptions = AggregationOptions.fromDocument(document);
assertThat(aggregationOptions.isAllowDiskUse(), is(true));
assertThat(aggregationOptions.isExplain(), is(true));
assertThat(aggregationOptions.getCursor(), is(new Document("batchSize", 1)));
assertThat(aggregationOptions.getCursorBatchSize(), is(1));
}
@Test // DATAMONGO-960
public void aggregationOptionsToString() {
assertThat(aggregationOptions.toDocument(),
is(Document.parse("{ \"allowDiskUse\" : true , \"explain\" : true , \"cursor\" : { \"foo\" : 1}}")));
is(Document.parse("{ \"allowDiskUse\" : true , \"explain\" : true , \"cursor\" : { \"batchSize\" : 1}}")));
}
}

View File

@@ -75,7 +75,7 @@ import com.mongodb.MongoException;
import com.mongodb.client.MongoCollection;
/**
* Tests for {@link MongoTemplate#aggregate(String, AggregationPipeline, Class)}.
* Tests for {@link MongoTemplate#aggregate(Aggregation, Class, Class)}.
*
* @author Tobias Trelle
* @author Thomas Darimont
@@ -83,7 +83,7 @@ import com.mongodb.client.MongoCollection;
* @author Christoph Strobl
* @author Mark Paluch
* @author Nikolay Bogdanov
* @author maninder
* @author Maninder Singh
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:infrastructure.xml")
@@ -244,15 +244,13 @@ public class AggregationTests {
project("n") //
.and("tag").previousOperation(), //
sort(DESC, "n") //
);
).withOptions(new AggregationOptions(true, false, 1));
CloseableIterator<TagCount> iterator = mongoTemplate.aggregateStream(agg, INPUT_COLLECTION, TagCount.class);
assertThat(iterator, is(notNullValue()));
List<TagCount> tagCount = new ArrayList<TagCount>();
while (iterator.hasNext()) {
tagCount.add(iterator.next());
}
List<TagCount> tagCount = toList(iterator);
iterator.close();
assertThat(tagCount, is(notNullValue()));
assertThat(tagCount.size(), is(3));
@@ -302,12 +300,9 @@ public class AggregationTests {
assertThat(results, is(notNullValue()));
List<TagCount> tagCount = new ArrayList<TagCount>();
while (results.hasNext()) {
tagCount.add(results.next());
}
List<TagCount> tagCount = toList(results);
results.close();
// assertThat(tagCount, is(notNullValue()));
assertThat(tagCount.size(), is(0));
}
@@ -409,11 +404,9 @@ public class AggregationTests {
assertThat(results, is(notNullValue()));
List<TagCount> tagCount = new ArrayList<TagCount>();
while (results.hasNext()) {
tagCount.add(results.next());
}
// assertThat(tagCount, is(notNullValue()));
List<TagCount> tagCount = toList(results);
results.close();
assertThat(tagCount.size(), is(2));
assertTagCount(null, 0, tagCount.get(0));
assertTagCount(null, 0, tagCount.get(1));
@@ -423,7 +416,7 @@ public class AggregationTests {
public void complexAggregationFrameworkUsageLargestAndSmallestCitiesByState() {
/*
//complex mongodb aggregation framework example from https://docs.mongodb.org/manual/tutorial/aggregation-examples/#largest-and-smallest-cities-by-state
db.zipInfo.aggregate(
db.zipInfo.aggregate(
{
$group: {
_id: {
@@ -530,18 +523,18 @@ public class AggregationTests {
@Test // DATAMONGO-586
public void findStatesWithPopulationOver10MillionAggregationExample() {
/*
//complex mongodb aggregation framework example from
//complex mongodb aggregation framework example from
https://docs.mongodb.org/manual/tutorial/aggregation-examples/#largest-and-smallest-cities-by-state
db.zipcodes.aggregate(
db.zipcodes.aggregate(
{
$group: {
_id:"$state",
totalPop:{ $sum:"$pop"}
}
},
{
$sort: { _id: 1, "totalPop": 1 }
{
$sort: { _id: 1, "totalPop": 1 }
},
{
$match: {
@@ -1289,10 +1282,9 @@ public class AggregationTests {
assertThat(agg.toString(), is(notNullValue()));
CloseableIterator<LikeStats> iterator = mongoTemplate.aggregateStream(agg, LikeStats.class);
List<LikeStats> result = new ArrayList<LikeStats>();
while (iterator.hasNext()) {
result.add(iterator.next());
}
List<LikeStats> result = toList(iterator);
iterator.close();
assertThat(result, is(notNullValue()));
assertThat(result, is(notNullValue()));
assertThat(result.size(), is(5));
@@ -1534,11 +1526,7 @@ public class AggregationTests {
assumeTrue(mongoVersion.isGreaterThanOrEqualTo(TWO_DOT_SIX));
mongoTemplate.save(new Person("Anna", "Ivanova", 21, Person.Sex.FEMALE));
mongoTemplate.save(new Person("Pavel", "Sidorov", 36, Person.Sex.MALE));
mongoTemplate.save(new Person("Anastasia", "Volochkova", 29, Person.Sex.FEMALE));
mongoTemplate.save(new Person("Igor", "Stepanov", 31, Person.Sex.MALE));
mongoTemplate.save(new Person("Leoniv", "Yakubov", 55, Person.Sex.MALE));
createPersonDocuments();
String tempOutCollection = "personQueryTemp";
TypedAggregation<Person> agg = newAggregation(Person.class, //
@@ -1563,11 +1551,7 @@ public class AggregationTests {
assumeTrue(mongoVersion.isGreaterThanOrEqualTo(TWO_DOT_SIX));
mongoTemplate.save(new Person("Anna", "Ivanova", 21, Person.Sex.FEMALE));
mongoTemplate.save(new Person("Pavel", "Sidorov", 36, Person.Sex.MALE));
mongoTemplate.save(new Person("Anastasia", "Volochkova", 29, Person.Sex.FEMALE));
mongoTemplate.save(new Person("Igor", "Stepanov", 31, Person.Sex.MALE));
mongoTemplate.save(new Person("Leoniv", "Yakubov", 55, Person.Sex.MALE));
createPersonDocuments();
String tempOutCollection = "personQueryTemp";
TypedAggregation<Person> agg = newAggregation(Person.class, //
@@ -1575,9 +1559,9 @@ public class AggregationTests {
sort(DESC, "count"), //
out(tempOutCollection));
CloseableIterator<DBObject> iterator = mongoTemplate.aggregateStream(agg, DBObject.class);
mongoTemplate.aggregateStream(agg, Document.class).close();
List<DBObject> list = mongoTemplate.findAll(DBObject.class, tempOutCollection);
List<Document> list = mongoTemplate.findAll(Document.class, tempOutCollection);
assertThat(list, hasSize(2));
assertThat(list.get(0), isBsonObject().containing("_id", "MALE").containing("count", 3));
@@ -1586,6 +1570,39 @@ public class AggregationTests {
mongoTemplate.dropCollection(tempOutCollection);
}
@Test // DATAMONGO-1637
public void shouldReturnDocumentsWithOutputCollectionWhileStreaming() {
assumeTrue(mongoVersion.isGreaterThanOrEqualTo(TWO_DOT_SIX));
createPersonDocuments();
String tempOutCollection = "personQueryTemp";
TypedAggregation<Person> agg = newAggregation(Person.class, //
group("sex").count().as("count"), //
sort(DESC, "count"), //
out(tempOutCollection));
CloseableIterator<Document> iterator = mongoTemplate.aggregateStream(agg, Document.class);
List<Document> result = toList(iterator);
assertThat(result, hasSize(2));
assertThat(result.get(0), isBsonObject().containing("_id", "MALE").containing("count", 3));
assertThat(result.get(1), isBsonObject().containing("_id", "FEMALE").containing("count", 2));
mongoTemplate.dropCollection(tempOutCollection);
}
private void createPersonDocuments() {
mongoTemplate.save(new Person("Anna", "Ivanova", 21, Person.Sex.FEMALE));
mongoTemplate.save(new Person("Pavel", "Sidorov", 36, Person.Sex.MALE));
mongoTemplate.save(new Person("Anastasia", "Volochkova", 29, Person.Sex.FEMALE));
mongoTemplate.save(new Person("Igor", "Stepanov", 31, Person.Sex.MALE));
mongoTemplate.save(new Person("Leoniv", "Yakubov", 55, Person.Sex.MALE));
}
@Test(expected = IllegalArgumentException.class) // DATAMONGO-1418
public void outShouldOutBeTheLastOperation() {
@@ -1887,6 +1904,16 @@ public class AggregationTests {
assertThat(tagCount.getN(), is(n));
}
private static <T> List<T> toList(CloseableIterator<? extends T> results) {
List<T> result = new ArrayList<T>();
while (results.hasNext()) {
result.add(results.next());
}
return result;
}
static class DATAMONGO753 {
PD[] pd;
@@ -2111,4 +2138,3 @@ public class AggregationTests {
double price;
}
}

View File

@@ -6,6 +6,7 @@
* Upgrade to Java 8.
* Usage of the `Document` API instead of `DBObject`.
* <<mongo.reactive>>.
* Support for aggregation result streaming via Java 8 `Stream`.
[[new-features.1-10-0]]
== What's new in Spring Data MongoDB 1.10