DATAMONGO-1890 - Add support for mapReduce to ReactiveMongoOperations.
We now support mapReduce via ReactiveMongoTemplate returning a Flux<T> as the operations result. Original pull request: #548.
This commit is contained in:
committed by
Mark Paluch
parent
2ec3f219c8
commit
857add7349
@@ -33,6 +33,7 @@ import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
|
||||
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
|
||||
import org.springframework.data.mongodb.core.convert.MongoConverter;
|
||||
import org.springframework.data.mongodb.core.index.ReactiveIndexOperations;
|
||||
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
|
||||
import org.springframework.data.mongodb.core.query.BasicQuery;
|
||||
import org.springframework.data.mongodb.core.query.Criteria;
|
||||
import org.springframework.data.mongodb.core.query.NearQuery;
|
||||
@@ -1226,6 +1227,213 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations {
|
||||
<T> Flux<ChangeStreamEvent<T>> changeStream(List<Document> filter, Class<T> resultType, ChangeStreamOptions options,
|
||||
String collectionName);
|
||||
|
||||
/**
|
||||
* Execute an inline map-reduce operation returning the operation result without storing it in an collection.
|
||||
*
|
||||
* @param resultType the mapping target of the operations result documents. Also used to determine the input
|
||||
* collection mame. Must not be {@literal null}.
|
||||
* @param mapFunction the JavaScript map function. Must not be {@literal null}.
|
||||
* @param reduceFunction the JavaScript reduce function. Must not be {@literal null}.
|
||||
* @return a {@link Flux} emitting the result document sequence. Never {@literal null}.
|
||||
* @since 2.1
|
||||
*/
|
||||
default <T> Flux<T> mapReduce(Class<T> resultType, String mapFunction, String reduceFunction) {
|
||||
return mapReduce(resultType, resultType, mapFunction, reduceFunction);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute an inline map-reduce operation returning the operation result without storing it in an collection.
|
||||
*
|
||||
* @param domainType source type used to determine the input collection name. Must not be {@literal null}.
|
||||
* @param resultType the mapping target of the operations result documents. Must not be {@literal null}.
|
||||
* @param mapFunction the JavaScript map function. Must not be {@literal null}.
|
||||
* @param reduceFunction the JavaScript reduce function. Must not be {@literal null}.
|
||||
* @return a {@link Flux} emitting the result document sequence. Never {@literal null}.
|
||||
* @since 2.1
|
||||
*/
|
||||
default <T> Flux<T> mapReduce(Class<?> domainType, Class<T> resultType, String mapFunction, String reduceFunction) {
|
||||
return mapReduce(new Query(), domainType, resultType, mapFunction, reduceFunction, MapReduceOptions.options());
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute an inline map-reduce operation returning the operation result without storing it in an collection.
|
||||
*
|
||||
* @param filterQuery the selection criteria for the documents going input to the map function. Must not be
|
||||
* {@literal null}.
|
||||
* @param resultType the mapping target of the operations result documents. Also used to determine the input
|
||||
* collection mame. Must not be {@literal null}.
|
||||
* @param mapFunction the JavaScript map function. Must not be {@literal null}.
|
||||
* @param reduceFunction the JavaScript reduce function. Must not be {@literal null}.
|
||||
* @return a {@link Flux} emitting the result document sequence. Never {@literal null}.
|
||||
* @since 2.1
|
||||
*/
|
||||
default <T> Flux<T> mapReduce(Query filterQuery, Class<T> resultType, String mapFunction, String reduceFunction) {
|
||||
return mapReduce(filterQuery, resultType, resultType, mapFunction, reduceFunction);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute an inline map-reduce operation returning the operation result without storing it in an collection.
|
||||
*
|
||||
* @param filterQuery the selection criteria for the documents going input to the map function. Must not be
|
||||
* {@literal null}.
|
||||
* @param domainType source type used to determine the input collection name and map the filter {@link Query} against.
|
||||
* Must not be {@literal null}.
|
||||
* @param resultType the mapping target of the operations result documents. Must not be {@literal null}.
|
||||
* @param mapFunction the JavaScript map function. Must not be {@literal null}.
|
||||
* @param reduceFunction the JavaScript reduce function. Must not be {@literal null}.
|
||||
* @return a {@link Flux} emitting the result document sequence. Never {@literal null}.
|
||||
* @since 2.1
|
||||
*/
|
||||
default <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, Class<T> resultType, String mapFunction,
|
||||
String reduceFunction) {
|
||||
|
||||
return mapReduce(filterQuery, domainType, resultType, mapFunction, reduceFunction, MapReduceOptions.options());
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a map-reduce operation. Use {@link MapReduceOptions} to optionally specify an output collection and other
|
||||
* args.
|
||||
*
|
||||
* @param resultType he mapping target of the operations result documents. Also used to determine the input collection
|
||||
* mame. Must not be {@literal null}.
|
||||
* @param mapFunction the JavaScript map function. Must not be {@literal null}.
|
||||
* @param reduceFunction the JavaScript reduce function. Must not be {@literal null}.
|
||||
* @param options additional options like output collection. Must not be {@literal null}.
|
||||
* @return a {@link Flux} emitting the result document sequence. Never {@literal null}.
|
||||
* @since 2.1
|
||||
*/
|
||||
default <T> Flux<T> mapReduce(Class<T> resultType, String mapFunction, String reduceFunction,
|
||||
MapReduceOptions options) {
|
||||
|
||||
return mapReduce(new Query(), resultType, mapFunction, reduceFunction, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a map-reduce operation. Use {@link MapReduceOptions} to optionally specify an output collection and other
|
||||
* args.
|
||||
*
|
||||
* @param filterQuery the selection criteria for the documents going input to the map function. Must not be
|
||||
* {@literal null}.
|
||||
* @param resultType he mapping target of the operations result documents. Also used to determine the input collection
|
||||
* mame. Must not be {@literal null}.
|
||||
* @param mapFunction the JavaScript map function. Must not be {@literal null}.
|
||||
* @param reduceFunction the JavaScript reduce function. Must not be {@literal null}.
|
||||
* @param options additional options like output collection. Must not be {@literal null}.
|
||||
* @return a {@link Flux} emitting the result document sequence. Never {@literal null}.
|
||||
* @since 2.1
|
||||
*/
|
||||
default <T> Flux<T> mapReduce(Query filterQuery, Class<T> resultType, String mapFunction, String reduceFunction,
|
||||
MapReduceOptions options) {
|
||||
|
||||
return mapReduce(filterQuery, resultType, resultType, mapFunction, reduceFunction, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a map-reduce operation. Use {@link MapReduceOptions} to optionally specify an output collection and other
|
||||
* args.
|
||||
*
|
||||
* @param filterQuery the selection criteria for the documents going input to the map function. Must not be
|
||||
* {@literal null}.
|
||||
* @param domainType source type used to determine the input collection name and map the filter {@link Query} against.
|
||||
* Must not be {@literal null}.
|
||||
* @param resultType the mapping target of the operations result documents. Must not be {@literal null}.
|
||||
* @param mapFunction the JavaScript map function. Must not be {@literal null}.
|
||||
* @param reduceFunction the JavaScript reduce function. Must not be {@literal null}.
|
||||
* @param options additional options like output collection. Must not be {@literal null}.
|
||||
* @return a {@link Flux} emitting the result document sequence. Never {@literal null}.
|
||||
* @since 2.1
|
||||
*/
|
||||
<T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, Class<T> resultType, String mapFunction,
|
||||
String reduceFunction, MapReduceOptions options);
|
||||
|
||||
/**
|
||||
* Execute an inline map-reduce operation returning the operation result without storing it in an collection.
|
||||
*
|
||||
* @param resultType the mapping target of the operations result documents. Must not be {@literal null}.
|
||||
* @param collectionName the input collection.
|
||||
* @param mapFunction the JavaScript map function. Must not be {@literal null}.
|
||||
* @param reduceFunction the JavaScript reduce function. Must not be {@literal null}.
|
||||
* @return a {@link Flux} emitting the result document sequence. Never {@literal null}.
|
||||
* @since 2.1
|
||||
*/
|
||||
default <T> Flux<T> mapReduce(Class<T> resultType, String collectionName, String mapFunction, String reduceFunction) {
|
||||
return mapReduce(new Query(), resultType, collectionName, mapFunction, reduceFunction);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute an inline map-reduce operation returning the operation result without storing it in an collection.
|
||||
*
|
||||
* @param filterQuery the selection criteria for the documents going input to the map function. Must not be
|
||||
* {@literal null}.
|
||||
* @param resultType the mapping target of the operations result documents. Must not be {@literal null}.
|
||||
* @param collectionName the input collection.
|
||||
* @param mapFunction the JavaScript map function. Must not be {@literal null}.
|
||||
* @param reduceFunction the JavaScript reduce function. Must not be {@literal null}.
|
||||
* @return a {@link Flux} emitting the result document sequence. Never {@literal null}.
|
||||
* @since 2.1
|
||||
*/
|
||||
default <T> Flux<T> mapReduce(Query filterQuery, Class<T> resultType, String collectionName, String mapFunction,
|
||||
String reduceFunction) {
|
||||
|
||||
return mapReduce(filterQuery, resultType, resultType, collectionName, mapFunction, reduceFunction);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute an inline map-reduce operation returning the operation result without storing it in an collection.
|
||||
*
|
||||
* @param filterQuery the selection criteria for the documents going input to the map function. Must not be
|
||||
* {@literal null}.
|
||||
* @param domainType source type used to map the filter {@link Query} against. Must not be {@literal null}.
|
||||
* @param resultType the mapping target of the operations result documents. Must not be {@literal null}.
|
||||
* @param collectionName the input collection.
|
||||
* @param mapFunction the JavaScript map function. Must not be {@literal null}.
|
||||
* @param reduceFunction the JavaScript reduce function. Must not be {@literal null}.
|
||||
* @return a {@link Flux} emitting the result document sequence. Never {@literal null}.
|
||||
* @since 2.1
|
||||
*/
|
||||
default <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, Class<T> resultType, String collectionName,
|
||||
String mapFunction, String reduceFunction) {
|
||||
|
||||
return mapReduce(filterQuery, domainType, collectionName, resultType, mapFunction, reduceFunction,
|
||||
MapReduceOptions.options());
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a map-reduce operation. Use {@link MapReduceOptions} to optionally specify an output collection and other
|
||||
* args.
|
||||
*
|
||||
* @param resultType the mapping target of the operations result documents. Must not be {@literal null}.
|
||||
* @param collectionName the input collection.
|
||||
* @param mapFunction the JavaScript map function. Must not be {@literal null}.
|
||||
* @param reduceFunction the JavaScript reduce function. Must not be {@literal null}.
|
||||
* @param options additional options like output collection. Must not be {@literal null}.
|
||||
* @return a {@link Flux} emitting the result document sequence. Never {@literal null}.
|
||||
* @since 2.1
|
||||
*/
|
||||
default <T> Flux<T> mapReduce(Class<T> resultType, String collectionName, String mapFunction, String reduceFunction,
|
||||
MapReduceOptions options) {
|
||||
|
||||
return mapReduce(new Query(), resultType, collectionName, resultType, mapFunction, reduceFunction, options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a map-reduce operation. Use {@link MapReduceOptions} to optionally specify an output collection and other
|
||||
* args.
|
||||
*
|
||||
* @param filterQuery the selection criteria for the documents going input to the map function. Must not be
|
||||
* {@literal null}.
|
||||
* @param domainType source type used to map the filter {@link Query} against. Must not be {@literal null}.
|
||||
* @param inputCollectionName the input collection.
|
||||
* @param resultType the mapping target of the operations result documents. Must not be {@literal null}.
|
||||
* @param mapFunction the JavaScript map function. Must not be {@literal null}.
|
||||
* @param reduceFunction the JavaScript reduce function. Must not be {@literal null}.
|
||||
* @param options additional options like output collection. Must not be {@literal null}.
|
||||
* @return a {@link Flux} emitting the result document sequence. Never {@literal null}.
|
||||
* @since 2.1
|
||||
*/
|
||||
<T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, String inputCollectionName, Class<T> resultType,
|
||||
String mapFunction, String reduceFunction, MapReduceOptions options);
|
||||
|
||||
/**
|
||||
* Returns the underlying {@link MongoConverter}.
|
||||
*
|
||||
|
||||
@@ -26,6 +26,7 @@ import reactor.util.function.Tuple2;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -88,6 +89,7 @@ import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeDeleteEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent;
|
||||
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
|
||||
import org.springframework.data.mongodb.core.query.Collation;
|
||||
import org.springframework.data.mongodb.core.query.Criteria;
|
||||
import org.springframework.data.mongodb.core.query.NearQuery;
|
||||
@@ -102,10 +104,21 @@ import org.springframework.data.util.Pair;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.ResourceUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import com.mongodb.*;
|
||||
import com.mongodb.BasicDBObject;
|
||||
import com.mongodb.ClientSessionOptions;
|
||||
import com.mongodb.CursorType;
|
||||
import com.mongodb.DBCollection;
|
||||
import com.mongodb.DBCursor;
|
||||
import com.mongodb.DBRef;
|
||||
import com.mongodb.Mongo;
|
||||
import com.mongodb.MongoException;
|
||||
import com.mongodb.ReadPreference;
|
||||
import com.mongodb.WriteConcern;
|
||||
import com.mongodb.client.model.CountOptions;
|
||||
import com.mongodb.client.model.CreateCollectionOptions;
|
||||
import com.mongodb.client.model.DeleteOptions;
|
||||
@@ -122,6 +135,7 @@ import com.mongodb.reactivestreams.client.AggregatePublisher;
|
||||
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
|
||||
import com.mongodb.reactivestreams.client.DistinctPublisher;
|
||||
import com.mongodb.reactivestreams.client.FindPublisher;
|
||||
import com.mongodb.reactivestreams.client.MapReducePublisher;
|
||||
import com.mongodb.reactivestreams.client.MongoClient;
|
||||
import com.mongodb.reactivestreams.client.MongoCollection;
|
||||
import com.mongodb.reactivestreams.client.MongoDatabase;
|
||||
@@ -1904,6 +1918,114 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
return Flux.from(publisher).map(document -> new ChangeStreamEvent<>(document, resultType, getConverter()));
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#mapReduce(org.springframework.data.mongodb.core.query.Query, java.lang.Class, java.lang.Class, java.lang.String, java.lang.String, org.springframework.data.mongodb.core.mapreduce.MapReduceOptions)
|
||||
*/
|
||||
public <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, Class<T> resultType, String mapFunction,
|
||||
String reduceFunction, MapReduceOptions options) {
|
||||
|
||||
return mapReduce(filterQuery, domainType, determineCollectionName(domainType), resultType, mapFunction,
|
||||
reduceFunction, options);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#mapReduce(org.springframework.data.mongodb.core.query.Query, java.lang.Class, java.lang.String, java.lang.Class, java.lang.String, java.lang.String, org.springframework.data.mongodb.core.mapreduce.MapReduceOptions)
|
||||
*/
|
||||
public <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, String inputCollectionName, Class<T> resultType,
|
||||
String mapFunction, String reduceFunction, MapReduceOptions options) {
|
||||
|
||||
verifyFunctions(mapFunction, reduceFunction);
|
||||
|
||||
Class<?> mappingTarget = domainType != null ? domainType : resultType;
|
||||
|
||||
return createFlux(inputCollectionName, collection -> {
|
||||
|
||||
Document mappedQuery = queryMapper.getMappedObject(filterQuery.getQueryObject(),
|
||||
mappingContext.getPersistentEntity(mappingTarget));
|
||||
|
||||
MapReducePublisher<Document> publisher = collection.mapReduce(mapFunction, reduceFunction, Document.class);
|
||||
|
||||
if (options != null) {
|
||||
if (StringUtils.hasText(options.getOutputCollection())) {
|
||||
publisher = publisher.collectionName(options.getOutputCollection());
|
||||
}
|
||||
}
|
||||
|
||||
publisher.filter(mappedQuery);
|
||||
publisher.sort(getMappedSortObject(filterQuery, mappingTarget));
|
||||
|
||||
if (filterQuery.getMeta() != null && filterQuery.getMeta().getMaxTimeMsec() != null) {
|
||||
publisher.maxTime(filterQuery.getMeta().getMaxTimeMsec(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
if (filterQuery.getLimit() > 0 || (options != null && options.getLimit() != null)) {
|
||||
|
||||
if (filterQuery.getLimit() > 0 && (options != null && options.getLimit() != null)) {
|
||||
throw new IllegalArgumentException("which one do ya want?");
|
||||
}
|
||||
|
||||
if (filterQuery.getLimit() > 0) {
|
||||
publisher.limit(filterQuery.getLimit());
|
||||
}
|
||||
|
||||
if ((options != null && options.getLimit() != null)) {
|
||||
publisher.limit(options.getLimit());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Optional<Collation> collation = filterQuery.getCollation();
|
||||
|
||||
if (options != null) {
|
||||
|
||||
Optionals.ifAllPresent(filterQuery.getCollation(), options.getCollation(), (l, r) -> {
|
||||
throw new IllegalArgumentException(
|
||||
"Both Query and MapReduceOptions define a collation. Please provide the collation only via one of the two.");
|
||||
});
|
||||
|
||||
if (options.getCollation().isPresent()) {
|
||||
collation = options.getCollation();
|
||||
}
|
||||
|
||||
if (!CollectionUtils.isEmpty(options.getScopeVariables())) {
|
||||
publisher = publisher.scope(new Document(options.getScopeVariables()));
|
||||
}
|
||||
if (options.getLimit() != null && options.getLimit().intValue() > 0) {
|
||||
publisher = publisher.limit(options.getLimit());
|
||||
}
|
||||
if (options.getFinalizeFunction().filter(StringUtils::hasText).isPresent()) {
|
||||
publisher = publisher.finalizeFunction(options.getFinalizeFunction().get());
|
||||
}
|
||||
if (options.getJavaScriptMode() != null) {
|
||||
publisher = publisher.jsMode(options.getJavaScriptMode());
|
||||
}
|
||||
if (options.getOutputSharded().isPresent()) {
|
||||
publisher = publisher.sharded(options.getOutputSharded().get());
|
||||
}
|
||||
}
|
||||
|
||||
publisher = collation.map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
|
||||
|
||||
return Flux.from(publisher)
|
||||
.map(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
|
||||
});
|
||||
}
|
||||
|
||||
private void verifyFunctions(String... functions) {
|
||||
|
||||
for (String function : functions) {
|
||||
|
||||
if (ResourceUtils.isUrl(function)) {
|
||||
|
||||
throw new IllegalArgumentException(String.format(
|
||||
"Blocking accessing to resource %s is not allowed using reactive infrastructure. You may load the resource at startup and cache its value.",
|
||||
function));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.mongodb.core.ReactiveFindOperation#query(java.lang.Class)
|
||||
@@ -2829,7 +2951,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
private final Metric metric;
|
||||
|
||||
/**
|
||||
* Creates a new {@link GeoNearResultDbObjectCallback} using the given {@link DbObjectCallback} delegate for
|
||||
* Creates a new {@link GeoNearResultDbObjectCallback} using the given {@link DocumentCallback} delegate for
|
||||
* {@link GeoResult} content unmarshalling.
|
||||
*
|
||||
* @param delegate must not be {@literal null}.
|
||||
|
||||
@@ -0,0 +1,169 @@
|
||||
/*
|
||||
* Copyright 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.mapreduce;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
import static org.springframework.data.mongodb.core.query.Criteria.*;
|
||||
import static org.springframework.data.mongodb.core.query.Query.*;
|
||||
|
||||
import lombok.Data;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
|
||||
import org.springframework.data.mongodb.core.SimpleReactiveMongoDatabaseFactory;
|
||||
import org.springframework.data.mongodb.core.mapping.Field;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
import com.mongodb.reactivestreams.client.MongoCollection;
|
||||
import com.mongodb.reactivestreams.client.Success;
|
||||
|
||||
/**
|
||||
* @author Christoph Strobl
|
||||
* @currentRead Beyond the Shadows - Brent Weeks
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration("classpath:reactive-infrastructure.xml")
|
||||
public class ReactiveMapReduceTests {
|
||||
|
||||
@Autowired SimpleReactiveMongoDatabaseFactory factory;
|
||||
@Autowired ReactiveMongoTemplate template;
|
||||
|
||||
private String mapFunction = "function(){ for ( var i=0; i<this.x.length; i++ ){ emit( this.x[i] , 1 ); } }";
|
||||
private String reduceFunction = "function(key,values){ var sum=0; for( var i=0; i<values.length; i++ ) sum += values[i]; return sum;}";
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
|
||||
StepVerifier
|
||||
.create(template.dropCollection(ValueObject.class) //
|
||||
.mergeWith(template.dropCollection("jmr1")) //
|
||||
.mergeWith(template.dropCollection("jmr1_out"))) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1890
|
||||
public void mapReduceWithInlineResult() {
|
||||
|
||||
createMapReduceData();
|
||||
|
||||
StepVerifier
|
||||
.create(template.mapReduce(new Query(), null, "jmr1", ValueObject.class, mapFunction, reduceFunction,
|
||||
MapReduceOptions.options()).buffer(4)) //
|
||||
.consumeNextWith(result -> {
|
||||
assertThat(result).containsExactlyInAnyOrder(new ValueObject("a", 1), new ValueObject("b", 2),
|
||||
new ValueObject("c", 2), new ValueObject("d", 1));
|
||||
}) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1890
|
||||
public void mapReduceWithInlineAndFilterQuery() {
|
||||
|
||||
createMapReduceData();
|
||||
|
||||
StepVerifier
|
||||
.create(template.mapReduce(query(where("x").ne(new String[] { "a", "b" })), ValueObject.class, "jmr1",
|
||||
ValueObject.class, mapFunction, reduceFunction, MapReduceOptions.options()).buffer(4)) //
|
||||
.consumeNextWith(result -> {
|
||||
assertThat(result).containsExactlyInAnyOrder(new ValueObject("b", 1), new ValueObject("c", 2),
|
||||
new ValueObject("d", 1));
|
||||
}) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1890
|
||||
public void mapReduceWithOutputCollection() {
|
||||
|
||||
createMapReduceData();
|
||||
|
||||
StepVerifier
|
||||
.create(template.mapReduce(new Query(), ValueObject.class, "jmr1", ValueObject.class, mapFunction,
|
||||
reduceFunction, MapReduceOptions.options().outputCollection("jmr1_out")))
|
||||
.expectNextCount(4).verifyComplete();
|
||||
|
||||
StepVerifier.create(template.find(new Query(), ValueObject.class, "jmr1_out").buffer(4)) //
|
||||
.consumeNextWith(result -> {
|
||||
assertThat(result).containsExactlyInAnyOrder(new ValueObject("a", 1), new ValueObject("b", 2),
|
||||
new ValueObject("c", 2), new ValueObject("d", 1));
|
||||
}) //
|
||||
.verifyComplete();
|
||||
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1890
|
||||
public void mapReduceWithInlineAndMappedFilterQuery() {
|
||||
|
||||
createMapReduceData();
|
||||
|
||||
StepVerifier
|
||||
.create(template.mapReduce(query(where("values").ne(new String[] { "a", "b" })), MappedFieldsValueObject.class,
|
||||
"jmr1", ValueObject.class, mapFunction, reduceFunction, MapReduceOptions.options()).buffer(4)) //
|
||||
.consumeNextWith(result -> {
|
||||
assertThat(result).containsExactlyInAnyOrder(new ValueObject("b", 1), new ValueObject("c", 2),
|
||||
new ValueObject("d", 1));
|
||||
}) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1890
|
||||
public void mapReduceWithInlineFilterQueryAndExtractedCollection() {
|
||||
|
||||
createMapReduceData();
|
||||
|
||||
StepVerifier
|
||||
.create(template.mapReduce(query(where("values").ne(new String[] { "a", "b" })), MappedFieldsValueObject.class,
|
||||
ValueObject.class, mapFunction, reduceFunction, MapReduceOptions.options()).buffer(4)) //
|
||||
.consumeNextWith(result -> {
|
||||
assertThat(result).containsExactlyInAnyOrder(new ValueObject("b", 1), new ValueObject("c", 2),
|
||||
new ValueObject("d", 1));
|
||||
}) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1890
|
||||
public void throwsExceptionWhenTryingToLoadFunctionsFromDisk() {
|
||||
|
||||
assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> template.mapReduce(new Query(), null,
|
||||
"foo", ValueObject.class, "classpath:map.js", "classpath:reduce.js", null))
|
||||
.withMessageContaining("classpath:map.js");
|
||||
}
|
||||
|
||||
private void createMapReduceData() {
|
||||
|
||||
MongoCollection<Document> collection = factory.getMongoDatabase().getCollection("jmr1", Document.class);
|
||||
|
||||
StepVerifier
|
||||
.create(collection.insertMany(Arrays.asList(new Document("x", Arrays.asList("a", "b")),
|
||||
new Document("x", Arrays.asList("b", "c")), new Document("x", Arrays.asList("c", "d")))))
|
||||
.expectNext(Success.SUCCESS).verifyComplete();
|
||||
}
|
||||
|
||||
@org.springframework.data.mongodb.core.mapping.Document("jmr1")
|
||||
@Data
|
||||
static class MappedFieldsValueObject {
|
||||
|
||||
@Field("x") String[] values;
|
||||
}
|
||||
}
|
||||
@@ -1,23 +1,38 @@
|
||||
/*
|
||||
* Copyright 2011-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core.mapreduce;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* @author Mark Pollack
|
||||
* @author Oliver Gierke
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class ValueObject {
|
||||
|
||||
private String id;
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
private float value;
|
||||
|
||||
public float getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public void setValue(float value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ValueObject [id=" + id + ", value=" + value + "]";
|
||||
|
||||
Reference in New Issue
Block a user