DATAMONGO-1986 - Polishing.

Refactor duplicated code into AggregationUtil.

Original pull request: #564.
This commit is contained in:
Mark Paluch
2018-06-06 09:53:54 +02:00
parent 2bac54c70f
commit 06622bed35
3 changed files with 126 additions and 114 deletions

View File

@@ -0,0 +1,115 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import lombok.AllArgsConstructor;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.bson.Document;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;
/**
* Utility methods to map {@link org.springframework.data.mongodb.core.aggregation.Aggregation} pipeline definitions and
* create type-bound {@link AggregationOperationContext}.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.1
*/
@AllArgsConstructor
class AggregationUtil {
QueryMapper queryMapper;
MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> mappingContext;
/**
* Prepare the {@link AggregationOperationContext} for a given aggregation by either returning the context itself it
* is not {@literal null}, create a {@link TypeBasedAggregationOperationContext} if the aggregation contains type
* information (is a {@link TypedAggregation}) or use the {@link Aggregation#DEFAULT_CONTEXT}.
*
* @param aggregation must not be {@literal null}.
* @param context can be {@literal null}.
* @return the root {@link AggregationOperationContext} to use.
*/
AggregationOperationContext prepareAggregationContext(Aggregation aggregation,
@Nullable AggregationOperationContext context) {
if (context != null) {
return context;
}
if (aggregation instanceof TypedAggregation) {
return new TypeBasedAggregationOperationContext(((TypedAggregation) aggregation).getInputType(), mappingContext,
queryMapper);
}
return Aggregation.DEFAULT_CONTEXT;
}
/**
* Extract and map the aggregation pipeline into a {@link List} of {@link Document}.
*
* @param aggregation
* @param context
* @return
*/
List<Document> createPipeline(Aggregation aggregation, AggregationOperationContext context) {
if (!ObjectUtils.nullSafeEquals(context, Aggregation.DEFAULT_CONTEXT)) {
return aggregation.toPipeline(context);
}
return mapAggregationPipeline(aggregation.toPipeline(context));
}
/**
* Extract the command and map the aggregation pipeline.
*
* @param aggregation
* @param context
* @return
*/
Document createCommand(String collection, Aggregation aggregation, AggregationOperationContext context) {
Document command = aggregation.toDocument(collection, context);
if (!ObjectUtils.nullSafeEquals(context, Aggregation.DEFAULT_CONTEXT)) {
return command;
}
command.put("pipeline", mapAggregationPipeline(command.get("pipeline", List.class)));
return command;
}
private List<Document> mapAggregationPipeline(List<Document> pipeline) {
return pipeline.stream().map(val -> queryMapper.getMappedObject(val, Optional.empty()))
.collect(Collectors.toList());
}
}

View File

@@ -2116,7 +2116,8 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
Assert.notNull(aggregation, "Aggregation pipeline must not be null!");
Assert.notNull(outputType, "Output type must not be null!");
return doAggregate(aggregation, collectionName, outputType, prepareAggregationContext(aggregation, context));
AggregationOperationContext contextToUse = new AggregationUtil(queryMapper, mappingContext).prepareAggregationContext(aggregation, context);
return doAggregate(aggregation, collectionName, outputType, contextToUse);
}
@SuppressWarnings("ConstantConditions")
@@ -2126,10 +2127,11 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
DocumentCallback<O> callback = new UnwrapAndReadDocumentCallback<>(mongoConverter, outputType, collectionName);
AggregationOptions options = aggregation.getOptions();
AggregationUtil aggregationUtil = new AggregationUtil(queryMapper, mappingContext);
if (options.isExplain()) {
Document command = aggregationToCommand(collectionName, aggregation, context);
Document command = aggregationUtil.createCommand(collectionName, aggregation, context);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Executing aggregation: {}", serializeToJsonSafely(command));
@@ -2140,7 +2142,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
.map(callback::doWith).collect(Collectors.toList()), commandResult);
}
List<Document> pipeline = aggregationToPipeline(aggregation, context);
List<Document> pipeline = aggregationUtil.createPipeline(aggregation, context);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Executing aggregation: {} in collection {}", serializeToJsonSafely(pipeline), collectionName);
@@ -2178,10 +2180,11 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
Assert.notNull(outputType, "Output type must not be null!");
Assert.isTrue(!aggregation.getOptions().isExplain(), "Can't use explain option with streaming!");
AggregationOperationContext rootContext = prepareAggregationContext(aggregation, context);
AggregationUtil aggregationUtil = new AggregationUtil(queryMapper, mappingContext);
AggregationOperationContext rootContext = aggregationUtil.prepareAggregationContext(aggregation, context);
AggregationOptions options = aggregation.getOptions();
List<Document> pipeline = aggregationToPipeline(aggregation, rootContext);
List<Document> pipeline = aggregationUtil.createPipeline(aggregation, rootContext);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Streaming aggregation: {} in collection {}", serializeToJsonSafely(pipeline), collectionName);
@@ -2844,72 +2847,6 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
return fields;
}
/**
* Prepare the {@link AggregationOperationContext} for a given aggregation by either returning the context itself it
* is not {@literal null}, create a {@link TypeBasedAggregationOperationContext} if the aggregation contains type
* information (is a {@link TypedAggregation}) or use the {@link Aggregation#DEFAULT_CONTEXT}.
*
* @param aggregation must not be {@literal null}.
* @param context can be {@literal null}.
* @return the root {@link AggregationOperationContext} to use.
*/
private AggregationOperationContext prepareAggregationContext(Aggregation aggregation,
@Nullable AggregationOperationContext context) {
if (context != null) {
return context;
}
if (aggregation instanceof TypedAggregation) {
return new TypeBasedAggregationOperationContext(((TypedAggregation) aggregation).getInputType(), mappingContext,
queryMapper);
}
return Aggregation.DEFAULT_CONTEXT;
}
/**
* Extract and map the aggregation pipeline.
*
* @param aggregation
* @param context
* @return
*/
private List<Document> aggregationToPipeline(Aggregation aggregation, AggregationOperationContext context) {
if (!ObjectUtils.nullSafeEquals(context, Aggregation.DEFAULT_CONTEXT)) {
return aggregation.toPipeline(context);
}
return mapAggregationPipeline(aggregation.toPipeline(context));
}
/**
* Extract the command and map the aggregation pipeline.
*
* @param aggregation
* @param context
* @return
*/
private Document aggregationToCommand(String collection, Aggregation aggregation,
AggregationOperationContext context) {
Document command = aggregation.toDocument(collection, context);
if (!ObjectUtils.nullSafeEquals(context, Aggregation.DEFAULT_CONTEXT)) {
return command;
}
command.put("pipeline", mapAggregationPipeline(command.get("pipeline", List.class)));
return command;
}
private List<Document> mapAggregationPipeline(List<Document> pipeline) {
return pipeline.stream().map(val -> queryMapper.getMappedObject(val, Optional.empty()))
.collect(Collectors.toList());
}
/**
* Tries to convert the given {@link RuntimeException} into a {@link DataAccessException} but returns the original

View File

@@ -969,10 +969,11 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
Assert.hasText(collectionName, "Collection name must not be null or empty!");
Assert.notNull(outputType, "Output type must not be null!");
AggregationOperationContext rootContext = prepareAggregationContext(aggregation, context);
AggregationUtil aggregationUtil = new AggregationUtil(queryMapper, mappingContext);
AggregationOperationContext rootContext = aggregationUtil.prepareAggregationContext(aggregation, context);
AggregationOptions options = aggregation.getOptions();
List<Document> pipeline = aggregationToPipeline(aggregation, rootContext);
List<Document> pipeline = aggregationUtil.createPipeline(aggregation, rootContext);
Assert.isTrue(!options.isExplain(), "Cannot use explain option with streaming!");
Assert.isNull(options.getCursorBatchSize(), "Cannot use batchSize cursor option with streaming!");
@@ -2654,47 +2655,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
};
}
/**
* Prepare the {@link AggregationOperationContext} for a given aggregation by either returning the context itself it
* is not {@literal null}, create a {@link TypeBasedAggregationOperationContext} if the aggregation contains type
* information (is a {@link TypedAggregation}) or use the {@link Aggregation#DEFAULT_CONTEXT}.
*
* @param aggregation must not be {@literal null}.
* @param context can be {@literal null}.
* @return the root {@link AggregationOperationContext} to use.
*/
private AggregationOperationContext prepareAggregationContext(Aggregation aggregation,
@Nullable AggregationOperationContext context) {
if (context != null) {
return context;
}
if (aggregation instanceof TypedAggregation) {
return new TypeBasedAggregationOperationContext(((TypedAggregation) aggregation).getInputType(), mappingContext,
queryMapper);
}
return Aggregation.DEFAULT_CONTEXT;
}
/**
* Extract and map the aggregation pipeline.
*
* @param aggregation
* @param context
* @return
*/
private List<Document> aggregationToPipeline(Aggregation aggregation, AggregationOperationContext context) {
if (!ObjectUtils.nullSafeEquals(context, Aggregation.DEFAULT_CONTEXT)) {
return aggregation.toPipeline(context);
}
return aggregation.toPipeline(context).stream().map(val -> queryMapper.getMappedObject(val, Optional.empty()))
.collect(Collectors.toList());
}
/**
* Tries to convert the given {@link RuntimeException} into a {@link DataAccessException} but returns the original
* exception if the conversation failed. Thus allows safe re-throwing of the return value.