DATAMONGO-2403 - Fix aggregation simple type result retrieval from empty document.

Projections used within an aggregation pipeline can result in empty documents emitted by the driver. We now guarded those cases and skip those documents within a Flux or simply return an empty Mono depending on the methods signature.

Original pull request: #804.
This commit is contained in:
Christoph Strobl
2019-11-06 09:42:38 +01:00
committed by Mark Paluch
parent ead7679f21
commit b014fe4c7c
3 changed files with 55 additions and 3 deletions

View File

@@ -123,6 +123,10 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
ResultProcessor processor = method.getResultProcessor().withDynamicProjection(convertingParamterAccessor);
Class<?> typeToRead = processor.getReturnedType().getTypeToRead();
if(typeToRead == null && method.getReturnType().getComponentType() != null) {
typeToRead = method.getReturnType().getComponentType().getType();
}
return doExecute(method, processor, convertingParamterAccessor, typeToRead);
}

View File

@@ -16,11 +16,11 @@
package org.springframework.data.mongodb.repository.query;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import org.bson.Document;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
@@ -95,7 +95,8 @@ public class ReactiveStringBasedAggregation extends AbstractReactiveMongoQuery {
Flux<?> flux = reactiveMongoOperations.aggregate(aggregation, targetType);
if (isSimpleReturnType && !isRawReturnType) {
flux = flux.map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter));
flux = flux.flatMap(
it -> Mono.justOrEmpty(AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter)));
}
if (method.isCollectionQuery()) {

View File

@@ -27,6 +27,7 @@ import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
@@ -36,7 +37,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@@ -535,6 +535,46 @@ public class ReactiveMongoRepositoryTests {
}).verifyComplete();
}
@Test // DATAMONGO-2153
public void annotatedAggregationWithAggregationResultAsMap() {
repository.sumAgeAndReturnSumAsMap() //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it).isInstanceOf(Map.class);
}).verifyComplete();
}
@Test // DATAMONGO-2403
public void annotatedAggregationExtractingSimpleValueEmitsEmptyMonoForEmptyDocument() {
Person p = new Person("project-on-lastanme", null);
repository.save(p).then().as(StepVerifier::create).verifyComplete();
repository.projectToLastnameAndRemoveId(p.getFirstname()) //
.as(StepVerifier::create) //
.verifyComplete();
}
@Test // DATAMONGO-2403
public void annotatedAggregationSkipsEmptyDocumentsWhenExtractingSimpleValue() {
String firstname = "project-on-lastanme";
Person p1 = new Person(firstname, null);
p1.setEmail("p1@example.com");
Person p2 = new Person(firstname, "lastname");
p2.setEmail("p2@example.com");
Person p3 = new Person(firstname, null);
p3.setEmail("p3@example.com");
repository.saveAll(Arrays.asList(p1, p2, p3)).then().as(StepVerifier::create).verifyComplete();
repository.projectToLastnameAndRemoveId(firstname) //
.as(StepVerifier::create) //
.expectNext("lastname").verifyComplete();
}
interface ReactivePersonRepository
extends ReactiveMongoRepository<Person, String>, ReactiveQuerydslPredicateExecutor<Person> {
@@ -596,6 +636,13 @@ public class ReactiveMongoRepositoryTests {
@Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }")
Mono<SumAge> sumAgeAndReturnSumWrapper();
@Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }")
Mono<Map> sumAgeAndReturnSumAsMap();
@Aggregation(
pipeline = { "{ '$match' : { 'firstname' : '?0' } }", "{ '$project' : { '_id' : 0, 'lastname' : 1 } }" })
Mono<String> projectToLastnameAndRemoveId(String firstname);
@Query(value = "{_id:?0}")
Mono<org.bson.Document> findDocumentById(String id);
}