DATAMONGO-2622 - Add support for $unionWith aggregation stage.
We now support the $unionWith aggregation stage via the UnionWithOperation that performs a union of two collections by combining pipeline results, potentially containing duplicates, into a single result set that is handed over to the next stage. In order to remove duplicates it is possible to append a GroupOperation right after UnionWithOperation. If the UnionWithOperation uses a pipeline to process documents, field names within the pipeline will be treated as is. In order to map domain type property names to actual field names (considering potential org.springframework.data.mongodb.core.mapping.Field annotations) make sure the enclosing aggregation is a TypedAggregation and provide the target type for the $unionWith stage via mapFieldsTo(Class). Original pull request: #886.
This commit is contained in:
committed by
Mark Paluch
parent
4548d07826
commit
230c32041a
@@ -28,6 +28,7 @@ import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOptions;
|
||||
import org.springframework.data.mongodb.core.aggregation.CountOperation;
|
||||
import org.springframework.data.mongodb.core.aggregation.RelaxedTypeBasedAggregationOperationContext;
|
||||
import org.springframework.data.mongodb.core.aggregation.TypeBasedAggregationOperationContext;
|
||||
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
|
||||
import org.springframework.data.mongodb.core.convert.QueryMapper;
|
||||
@@ -75,12 +76,17 @@ class AggregationUtil {
|
||||
return context;
|
||||
}
|
||||
|
||||
if (aggregation instanceof TypedAggregation) {
|
||||
return new TypeBasedAggregationOperationContext(((TypedAggregation) aggregation).getInputType(), mappingContext,
|
||||
queryMapper);
|
||||
if (!(aggregation instanceof TypedAggregation)) {
|
||||
return Aggregation.DEFAULT_CONTEXT;
|
||||
}
|
||||
|
||||
return Aggregation.DEFAULT_CONTEXT;
|
||||
Class<?> inputType = ((TypedAggregation) aggregation).getInputType();
|
||||
|
||||
if (aggregation.getPipeline().requiresRelaxedChecking()) {
|
||||
return new RelaxedTypeBasedAggregationOperationContext(inputType, mappingContext, queryMapper);
|
||||
}
|
||||
|
||||
return new TypeBasedAggregationOperationContext(inputType, mappingContext, queryMapper);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1977,9 +1977,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
||||
|
||||
Assert.notNull(aggregation, "Aggregation pipeline must not be null!");
|
||||
|
||||
AggregationOperationContext context = new TypeBasedAggregationOperationContext(aggregation.getInputType(),
|
||||
mappingContext, queryMapper);
|
||||
return aggregate(aggregation, inputCollectionName, outputType, context);
|
||||
return aggregate(aggregation, inputCollectionName, outputType, null);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
||||
@@ -45,6 +45,8 @@ public class AggregationPipeline {
|
||||
* @param aggregationOperations must not be {@literal null}.
|
||||
*/
|
||||
public AggregationPipeline(List<AggregationOperation> aggregationOperations) {
|
||||
|
||||
Assert.notNull(aggregationOperations, "AggregationOperations must not be null!");
|
||||
pipeline = new ArrayList<>(aggregationOperations);
|
||||
}
|
||||
|
||||
@@ -108,4 +110,37 @@ public class AggregationPipeline {
|
||||
private boolean isLast(AggregationOperation aggregationOperation) {
|
||||
return pipeline.indexOf(aggregationOperation) == pipeline.size() - 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@literal true} if field names might get computed by one of the pipeline stages, that the
|
||||
* {@link AggregationOperationContext} might not be aware of. A strongly typed context might fail to resolve
|
||||
* field references, so if {@literal true} usage of a {@link RelaxedTypeBasedAggregationOperationContext}
|
||||
* might be the better choice.
|
||||
* @since 3.1
|
||||
*/
|
||||
public boolean requiresRelaxedChecking() {
|
||||
return pipelineContainsValueOfType(UnionWithOperation.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@literal true} if the pipeline does not contain any stages.
|
||||
* @since 3.1
|
||||
*/
|
||||
public boolean isEmpty() {
|
||||
return pipeline.isEmpty();
|
||||
}
|
||||
|
||||
private boolean pipelineContainsValueOfType(Class<?> type) {
|
||||
|
||||
if (isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (Object element : pipeline) {
|
||||
if (type.isInstance(element)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,4 +149,12 @@ class ExposedFieldsAggregationOperationContext implements AggregationOperationCo
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return obtain the root context used to resolve references.
|
||||
* @since 3.1
|
||||
*/
|
||||
AggregationOperationContext getRootContext() {
|
||||
return rootContext;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,6 +133,19 @@ public class TypeBasedAggregationOperationContext implements AggregationOperatio
|
||||
*/
|
||||
@Override
|
||||
public AggregationOperationContext continueOnMissingFieldReference() {
|
||||
return continueOnMissingFieldReference(type);
|
||||
}
|
||||
|
||||
/**
|
||||
* This toggle allows the {@link AggregationOperationContext context} to use any given field name without checking for
|
||||
* its existence. Typically the {@link AggregationOperationContext} fails when referencing unknown fields, those that
|
||||
* are not present in one of the previous stages or the input source, throughout the pipeline.
|
||||
*
|
||||
* @param type The domain type to map fields to.
|
||||
* @return a more relaxed {@link AggregationOperationContext}.
|
||||
* @since 3.1
|
||||
*/
|
||||
public AggregationOperationContext continueOnMissingFieldReference(Class<?> type) {
|
||||
return new RelaxedTypeBasedAggregationOperationContext(type, mappingContext, mapper);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,170 @@
|
||||
/*
|
||||
* Copyright 2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.aggregation;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* The <a href="https://docs.mongodb.com/master/reference/operator/aggregation/unionWith/">$unionWith</a> aggregation
|
||||
* stage (available since MongoDB 4.4) performs a union of two collections by combining pipeline results, potentially
|
||||
* containing duplicates, into a single result set that is handed over to the next stage. <br />
|
||||
* In order to remove duplicates it is possible to append a {@link GroupOperation} right after
|
||||
* {@link UnionWithOperation}.
|
||||
* <p />
|
||||
* If the {@link UnionWithOperation} uses a
|
||||
* <a href="https://docs.mongodb.com/master/reference/operator/aggregation/unionWith/#unionwith-pipeline">pipeline</a>
|
||||
* to process documents, field names within the pipeline will be treated as is. In order to map domain type property
|
||||
* names to actual field names (considering potential {@link org.springframework.data.mongodb.core.mapping.Field}
|
||||
* annotations) make sure the enclosing aggregation is a {@link TypedAggregation} and provide the target type for the
|
||||
* {@code $unionWith} stage via {@link #mapFieldsTo(Class)}.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @see <a href="https://docs.mongodb.com/master/reference/operator/aggregation/unionWith/">Aggregation Pipeline Stage:
|
||||
* $unionWith</a>
|
||||
* @since 3.1
|
||||
*/
|
||||
public class UnionWithOperation implements AggregationOperation {
|
||||
|
||||
private final String collection;
|
||||
|
||||
@Nullable //
|
||||
private final AggregationPipeline pipeline;
|
||||
|
||||
@Nullable //
|
||||
private final Class<?> domainType;
|
||||
|
||||
public UnionWithOperation(String collection, @Nullable AggregationPipeline pipeline, @Nullable Class<?> domainType) {
|
||||
|
||||
this.collection = collection;
|
||||
this.pipeline = pipeline;
|
||||
this.domainType = domainType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the name of the collection from which pipeline results should be included in the result set.<br />
|
||||
* The collection name is used to set the {@code coll} parameter of {@code $unionWith}.
|
||||
*
|
||||
* @param collection the MongoDB collection name. Must not be {@literal null}.
|
||||
* @return new instance of {@link UnionWithOperation}.
|
||||
* @throws IllegalArgumentException if the required argument is {@literal null}.
|
||||
*/
|
||||
public static UnionWithOperation unionWith(String collection) {
|
||||
|
||||
Assert.notNull(collection, "Collection must not be null!");
|
||||
return new UnionWithOperation(collection, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link AggregationPipeline} to apply to the specified collection. The pipeline corresponds to the optional
|
||||
* {@code pipeline} field of the {@code $unionWith} aggregation stage and is used to compute the documents going into
|
||||
* the result set.
|
||||
*
|
||||
* @param pipeline the {@link AggregationPipeline} that computes the documents. Must not be {@literal null}.
|
||||
* @return new instance of {@link UnionWithOperation}.
|
||||
* @throws IllegalArgumentException if the required argument is {@literal null}.
|
||||
*/
|
||||
public UnionWithOperation pipeline(AggregationPipeline pipeline) {
|
||||
return new UnionWithOperation(collection, pipeline, domainType);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the aggregation pipeline stages to apply to the specified collection. The pipeline corresponds to the optional
|
||||
* {@code pipeline} field of the {@code $unionWith} aggregation stage and is used to compute the documents going into
|
||||
* the result set.
|
||||
*
|
||||
* @param aggregationStages the aggregation pipeline stages that compute the documents. Must not be {@literal null}.
|
||||
* @return new instance of {@link UnionWithOperation}.
|
||||
* @throws IllegalArgumentException if the required argument is {@literal null}.
|
||||
*/
|
||||
public UnionWithOperation pipeline(List<AggregationOperation> aggregationStages) {
|
||||
return new UnionWithOperation(collection, new AggregationPipeline(aggregationStages), domainType);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the aggregation pipeline stages to apply to the specified collection. The pipeline corresponds to the optional
|
||||
* {@code pipeline} field of the {@code $unionWith} aggregation stage and is used to compute the documents going into
|
||||
* the result set.
|
||||
*
|
||||
* @param aggregationStages the aggregation pipeline stages that compute the documents. Must not be {@literal null}.
|
||||
* @return new instance of {@link UnionWithOperation}.
|
||||
* @throws IllegalArgumentException if the required argument is {@literal null}.
|
||||
*/
|
||||
public UnionWithOperation pipeline(AggregationOperation... aggregationStages) {
|
||||
return new UnionWithOperation(collection, new AggregationPipeline(Arrays.asList(aggregationStages)), domainType);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set domain type used for field name mapping of property references used by the {@link AggregationPipeline}.
|
||||
* Remember to also use a {@link TypedAggregation} in the outer pipeline.<br />
|
||||
* If not set, field names used within {@link AggregationOperation pipeline operations} are taken as is.
|
||||
*
|
||||
* @param domainType the domain type to map field names used in pipeline operations to. Must not be {@literal null}.
|
||||
* @return new instance of {@link UnionWithOperation}.
|
||||
* @throws IllegalArgumentException if the required argument is {@literal null}.
|
||||
*/
|
||||
public UnionWithOperation mapFieldsTo(Class<?> domainType) {
|
||||
|
||||
Assert.notNull(domainType, "DomainType must not be null!");
|
||||
return new UnionWithOperation(collection, pipeline, domainType);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.mongodb.core.aggregation.AggregationOperation#toDocument(org.springframework.data.mongodb.core.aggregation.AggregationOperationContext)
|
||||
*/
|
||||
@Override
|
||||
public Document toDocument(AggregationOperationContext context) {
|
||||
|
||||
Document $unionWith = new Document("coll", collection);
|
||||
if (pipeline == null || pipeline.isEmpty()) {
|
||||
return new Document(getOperator(), $unionWith);
|
||||
}
|
||||
|
||||
$unionWith.append("pipeline", pipeline.toDocuments(computeContext(context)));
|
||||
return new Document(getOperator(), $unionWith);
|
||||
}
|
||||
|
||||
private AggregationOperationContext computeContext(AggregationOperationContext source) {
|
||||
|
||||
if (domainType == null) {
|
||||
return Aggregation.DEFAULT_CONTEXT;
|
||||
}
|
||||
|
||||
if (source instanceof TypeBasedAggregationOperationContext) {
|
||||
return ((TypeBasedAggregationOperationContext) source).continueOnMissingFieldReference(domainType);
|
||||
}
|
||||
|
||||
if (source instanceof ExposedFieldsAggregationOperationContext) {
|
||||
return computeContext(((ExposedFieldsAggregationOperationContext) source).getRootContext());
|
||||
}
|
||||
|
||||
return source;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.mongodb.core.aggregation.AggregationOperation#getOperator()
|
||||
*/
|
||||
@Override
|
||||
public String getOperator() {
|
||||
return "$unionWith";
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,129 @@
|
||||
/*
|
||||
* Copyright 2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.aggregation;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
|
||||
import org.springframework.data.mongodb.core.convert.NoOpDbRefResolver;
|
||||
import org.springframework.data.mongodb.core.convert.QueryMapper;
|
||||
import org.springframework.data.mongodb.core.mapping.Field;
|
||||
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
|
||||
import org.springframework.lang.Nullable;
|
||||
|
||||
/**
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
class UnionWithOperationUnitTests {
|
||||
|
||||
@Test // DATAMONGO-2622
|
||||
void throwsErrorWhenNoCollectionPresent() {
|
||||
assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> UnionWithOperation.unionWith(null));
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2622
|
||||
void rendersJustCollectionCorrectly() {
|
||||
|
||||
assertThat(UnionWithOperation.unionWith("coll-1").toPipelineStages(contextFor(Warehouse.class)))
|
||||
.containsExactly(new Document("$unionWith", new Document("coll", "coll-1")));
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2622
|
||||
void rendersPipelineCorrectly() {
|
||||
|
||||
assertThat(UnionWithOperation.unionWith("coll-1").mapFieldsTo(Warehouse.class)
|
||||
.pipeline(Aggregation.project().and("location").as("region")).toPipelineStages(contextFor(Warehouse.class)))
|
||||
.containsExactly(new Document("$unionWith", new Document("coll", "coll-1").append("pipeline",
|
||||
Arrays.asList(new Document("$project", new Document("region", 1))))));
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2622
|
||||
void rendersPipelineCorrectlyForDifferentDomainType() {
|
||||
|
||||
assertThat(UnionWithOperation.unionWith("coll-1").pipeline(Aggregation.project().and("name").as("name"))
|
||||
.mapFieldsTo(Supplier.class).toPipelineStages(contextFor(Warehouse.class)))
|
||||
.containsExactly(new Document("$unionWith", new Document("coll", "coll-1").append("pipeline",
|
||||
Arrays.asList(new Document("$project", new Document("name", "$supplier"))))));
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2622
|
||||
void rendersPipelineCorrectlyForUntypedContext() {
|
||||
|
||||
assertThat(UnionWithOperation.unionWith("coll-1").pipeline(Aggregation.project("region"))
|
||||
.toPipelineStages(contextFor(null)))
|
||||
.containsExactly(new Document("$unionWith", new Document("coll", "coll-1").append("pipeline",
|
||||
Arrays.asList(new Document("$project", new Document("region", 1))))));
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2622
|
||||
void doesNotMapAgainstFieldsFromAPreviousStage() {
|
||||
|
||||
TypedAggregation<Supplier> agg = TypedAggregation.newAggregation(Supplier.class,
|
||||
Aggregation.project().and("name").as("supplier"),
|
||||
UnionWithOperation.unionWith("coll-1").pipeline(Aggregation.project().and("name").as("name")));
|
||||
|
||||
List<Document> pipeline = agg.toPipeline(contextFor(Supplier.class));
|
||||
System.out.println("pipeline: " + pipeline);
|
||||
assertThat(pipeline).containsExactly(new Document("$project", new Document("supplier", 1)), //
|
||||
new Document("$unionWith", new Document("coll", "coll-1").append("pipeline",
|
||||
Arrays.asList(new Document("$project", new Document("name", 1))))));
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2622
|
||||
void mapAgainstUnionWithDomainTypeEvenWhenInsideTypedAggregation() {
|
||||
|
||||
TypedAggregation<Supplier> agg = TypedAggregation.newAggregation(Supplier.class,
|
||||
Aggregation.project().and("name").as("supplier"), UnionWithOperation.unionWith("coll-1")
|
||||
.mapFieldsTo(Warehouse.class).pipeline(Aggregation.project().and("location").as("location")));
|
||||
|
||||
List<Document> pipeline = agg.toPipeline(contextFor(Supplier.class));
|
||||
assertThat(pipeline).containsExactly(new Document("$project", new Document("supplier", 1)), //
|
||||
new Document("$unionWith", new Document("coll", "coll-1").append("pipeline",
|
||||
Arrays.asList(new Document("$project", new Document("location", "$region"))))));
|
||||
}
|
||||
|
||||
private static AggregationOperationContext contextFor(@Nullable Class<?> type) {
|
||||
|
||||
if (type == null) {
|
||||
return Aggregation.DEFAULT_CONTEXT;
|
||||
}
|
||||
|
||||
MappingMongoConverter mongoConverter = new MappingMongoConverter(NoOpDbRefResolver.INSTANCE,
|
||||
new MongoMappingContext());
|
||||
mongoConverter.afterPropertiesSet();
|
||||
|
||||
return new TypeBasedAggregationOperationContext(type, mongoConverter.getMappingContext(),
|
||||
new QueryMapper(mongoConverter));
|
||||
}
|
||||
|
||||
static class Warehouse {
|
||||
|
||||
String name;
|
||||
@Field("region") String location;
|
||||
String state;
|
||||
}
|
||||
|
||||
static class Supplier {
|
||||
|
||||
@Field("supplier") String name;
|
||||
String state;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user