Use Java 8 Stream as return type for Template operations returning a stream.

We now use Stream instead of CloseableIterator for easier stream creation.

Closes: #3944
Original Pull Request: #3946
This commit is contained in:
Mark Paluch
2022-01-24 14:04:15 +01:00
committed by Christoph Strobl
parent a1c483f2e1
commit 2367379b6d
12 changed files with 146 additions and 169 deletions

View File

@@ -15,9 +15,10 @@
*/
package org.springframework.data.mongodb.core;
import java.util.stream.Stream;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.util.CloseableIterator;
/**
* {@link ExecutableAggregationOperation} allows creation and execution of MongoDB aggregation operations in a fluent
@@ -88,12 +89,12 @@ public interface ExecutableAggregationOperation {
/**
* Apply pipeline operations as specified and stream all matching elements. <br />
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link com.mongodb.client.FindIterable}
* Returns a {@link Stream} that wraps the Mongo DB {@link com.mongodb.client.FindIterable}
*
* @return a {@link CloseableIterator} that wraps the a Mongo DB {@link com.mongodb.client.FindIterable} that needs to be closed.
* Never {@literal null}.
* @return the result {@link Stream}, containing mapped objects, needing to be closed once fully processed (e.g.
* through a try-with-resources clause).
*/
CloseableIterator<T> stream();
Stream<T> stream();
}
/**

View File

@@ -15,10 +15,11 @@
*/
package org.springframework.data.mongodb.core;
import java.util.stream.Stream;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.util.CloseableIterator;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
@@ -87,7 +88,7 @@ class ExecutableAggregationOperationSupport implements ExecutableAggregationOper
}
@Override
public CloseableIterator<T> stream() {
public Stream<T> stream() {
return template.aggregateStream(aggregation, getCollectionName(aggregation), domainType);
}

View File

@@ -118,8 +118,8 @@ public interface ExecutableFindOperation {
/**
* Stream all matching elements.
*
* @return a {@link Stream} that wraps the a Mongo DB {@link com.mongodb.client.FindIterable} that needs to be closed. Never
* {@literal null}.
* @return the result {@link Stream}, containing mapped objects, needing to be closed once fully processed (e.g.
* through a try-with-resources clause).
*/
Stream<T> stream();

View File

@@ -20,12 +20,11 @@ import java.util.Optional;
import java.util.stream.Stream;
import org.bson.Document;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.SerializationUtils;
import org.springframework.data.util.CloseableIterator;
import org.springframework.data.util.StreamUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
@@ -70,11 +69,11 @@ class ExecutableFindOperationSupport implements ExecutableFindOperation {
private final MongoTemplate template;
private final Class<?> domainType;
private final Class<T> returnType;
@Nullable private final String collection;
private final @Nullable String collection;
private final Query query;
ExecutableFindSupport(MongoTemplate template, Class<?> domainType, Class<T> returnType,
String collection, Query query) {
@Nullable String collection, Query query) {
this.template = template;
this.domainType = domainType;
this.returnType = returnType;
@@ -137,7 +136,7 @@ class ExecutableFindOperationSupport implements ExecutableFindOperation {
@Override
public Stream<T> stream() {
return StreamUtils.createStreamFromIterator(doStream());
return doStream();
}
@Override
@@ -179,7 +178,7 @@ class ExecutableFindOperationSupport implements ExecutableFindOperation {
returnType == domainType ? (Class<T>) Object.class : returnType);
}
private CloseableIterator<T> doStream() {
private Stream<T> doStream() {
return template.doStream(query, domainType, getCollectionName(), returnType);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2011-2021 the original author or authors.
* Copyright 2011-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,8 +20,10 @@ import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.bson.Document;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
@@ -42,7 +44,6 @@ import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.data.util.CloseableIterator;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
@@ -225,34 +226,34 @@ public interface MongoOperations extends FluentMongoOperations {
* Executes the given {@link Query} on the entity collection of the specified {@code entityType} backed by a Mongo DB
* {@link com.mongodb.client.FindIterable}.
* <p>
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link com.mongodb.client.FindIterable} that needs to
* be closed.
* Returns a {@link String} that wraps the Mongo DB {@link com.mongodb.client.FindIterable} that needs to be closed.
*
* @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification. Must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param <T> element return type
* @return will never be {@literal null}.
* @return the result {@link Stream}, containing mapped objects, needing to be closed once fully processed (e.g.
* through a try-with-resources clause).
* @since 1.7
*/
<T> CloseableIterator<T> stream(Query query, Class<T> entityType);
<T> Stream<T> stream(Query query, Class<T> entityType);
/**
* Executes the given {@link Query} on the entity collection of the specified {@code entityType} and collection backed
* by a Mongo DB {@link com.mongodb.client.FindIterable}.
* <p>
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link com.mongodb.client.FindIterable} that needs to
* be closed.
* Returns a {@link Stream} that wraps the Mongo DB {@link com.mongodb.client.FindIterable} that needs to be closed.
*
* @param query the query class that specifies the criteria used to find a record and also an optional fields
* specification. Must not be {@literal null}.
* @param entityType must not be {@literal null}.
* @param collectionName must not be {@literal null} or empty.
* @param <T> element return type
* @return will never be {@literal null}.
* @return the result {@link Stream}, containing mapped objects, needing to be closed once fully processed (e.g.
* through a try-with-resources clause).
* @since 1.10
*/
<T> CloseableIterator<T> stream(Query query, Class<T> entityType, String collectionName);
<T> Stream<T> stream(Query query, Class<T> entityType, String collectionName);
/**
* Create an uncapped collection with a name based on the provided entity class.
@@ -521,9 +522,9 @@ public interface MongoOperations extends FluentMongoOperations {
/**
* Execute an aggregation operation backed by a Mongo DB {@link com.mongodb.client.AggregateIterable}.
* <p>
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link com.mongodb.client.AggregateIterable} that
* needs to be closed. The raw results will be mapped to the given entity class. The name of the inputCollection is
* derived from the inputType of the aggregation.
* Returns a {@link Stream} that wraps the Mongo DB {@link com.mongodb.client.AggregateIterable} that needs to be
* closed. The raw results will be mapped to the given entity class. The name of the inputCollection is derived from
* the inputType of the aggregation.
* <p>
* Aggregation streaming can't be used with {@link AggregationOptions#isExplain() aggregation explain}. Enabling
* explanation mode will throw an {@link IllegalArgumentException}.
@@ -532,35 +533,37 @@ public interface MongoOperations extends FluentMongoOperations {
* {@literal null}.
* @param collectionName The name of the input collection to use for the aggreation.
* @param outputType The parametrized type of the returned list, must not be {@literal null}.
* @return The results of the aggregation operation.
* @return the result {@link Stream}, containing mapped objects, needing to be closed once fully processed (e.g.
* through a try-with-resources clause).
* @since 2.0
*/
<O> CloseableIterator<O> aggregateStream(TypedAggregation<?> aggregation, String collectionName, Class<O> outputType);
<O> Stream<O> aggregateStream(TypedAggregation<?> aggregation, String collectionName, Class<O> outputType);
/**
* Execute an aggregation operation backed by a Mongo DB {@link com.mongodb.client.AggregateIterable}.
* <br />
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link com.mongodb.client.AggregateIterable} that
* needs to be closed. The raw results will be mapped to the given entity class and are returned as stream. The name
* of the inputCollection is derived from the inputType of the aggregation.
* <br />
* <p>
* Returns a {@link Stream} that wraps the Mongo DB {@link com.mongodb.client.AggregateIterable} that needs to be
* closed. The raw results will be mapped to the given entity class and are returned as stream. The name of the
* inputCollection is derived from the inputType of the aggregation.
* <p>
* Aggregation streaming can't be used with {@link AggregationOptions#isExplain() aggregation explain}. Enabling
* explanation mode will throw an {@link IllegalArgumentException}.
*
* @param aggregation The {@link TypedAggregation} specification holding the aggregation operations, must not be
* {@literal null}.
* @param outputType The parametrized type of the returned list, must not be {@literal null}.
* @return The results of the aggregation operation.
* @return the result {@link Stream}, containing mapped objects, needing to be closed once fully processed (e.g.
* through a try-with-resources clause).
* @since 2.0
*/
<O> CloseableIterator<O> aggregateStream(TypedAggregation<?> aggregation, Class<O> outputType);
<O> Stream<O> aggregateStream(TypedAggregation<?> aggregation, Class<O> outputType);
/**
* Execute an aggregation operation backed by a Mongo DB {@link com.mongodb.client.AggregateIterable}.
* <br />
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link com.mongodb.client.AggregateIterable} that
* needs to be closed. The raw results will be mapped to the given entity class.
* <br />
* <p>
* Returns a {@link Stream} that wraps the Mongo DB {@link com.mongodb.client.AggregateIterable} that needs to be
* closed. The raw results will be mapped to the given entity class.
* <p>
* Aggregation streaming can't be used with {@link AggregationOptions#isExplain() aggregation explain}. Enabling
* explanation mode will throw an {@link IllegalArgumentException}.
*
@@ -569,17 +572,18 @@ public interface MongoOperations extends FluentMongoOperations {
* @param inputType the inputType where the aggregation operation will read from, must not be {@literal null} or
* empty.
* @param outputType The parametrized type of the returned list, must not be {@literal null}.
* @return The results of the aggregation operation.
* @return the result {@link Stream}, containing mapped objects, needing to be closed once fully processed (e.g.
* through a try-with-resources clause).
* @since 2.0
*/
<O> CloseableIterator<O> aggregateStream(Aggregation aggregation, Class<?> inputType, Class<O> outputType);
<O> Stream<O> aggregateStream(Aggregation aggregation, Class<?> inputType, Class<O> outputType);
/**
* Execute an aggregation operation backed by a Mongo DB {@link com.mongodb.client.AggregateIterable}.
* <br />
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link com.mongodb.client.AggregateIterable} that
* needs to be closed. The raw results will be mapped to the given entity class.
* <br />
* <p>
* Returns a {@link Stream} that wraps the Mongo DB {@link com.mongodb.client.AggregateIterable} that needs to be
* closed. The raw results will be mapped to the given entity class.
* <p>
* Aggregation streaming can't be used with {@link AggregationOptions#isExplain() aggregation explain}. Enabling
* explanation mode will throw an {@link IllegalArgumentException}.
*
@@ -588,10 +592,11 @@ public interface MongoOperations extends FluentMongoOperations {
* @param collectionName the collection where the aggregation operation will read from, must not be {@literal null} or
* empty.
* @param outputType The parametrized type of the returned list, must not be {@literal null}.
* @return The results of the aggregation operation.
* @return the result {@link Stream}, containing mapped objects, needing to be closed once fully processed (e.g.
* through a try-with-resources clause).
* @since 2.0
*/
<O> CloseableIterator<O> aggregateStream(Aggregation aggregation, String collectionName, Class<O> outputType);
<O> Stream<O> aggregateStream(Aggregation aggregation, String collectionName, Class<O> outputType);
/**
* Execute a map-reduce operation. The map-reduce operation will be formed with an output type of INLINE

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2010-2021 the original author or authors.
* Copyright 2010-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ import java.math.RoundingMode;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -375,17 +376,17 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
}
@Override
public <T> CloseableIterator<T> stream(Query query, Class<T> entityType) {
public <T> Stream<T> stream(Query query, Class<T> entityType) {
return stream(query, entityType, getCollectionName(entityType));
}
@Override
public <T> CloseableIterator<T> stream(Query query, Class<T> entityType, String collectionName) {
public <T> Stream<T> stream(Query query, Class<T> entityType, String collectionName) {
return doStream(query, entityType, collectionName, entityType);
}
@SuppressWarnings("ConstantConditions")
protected <T> CloseableIterator<T> doStream(Query query, Class<?> entityType, String collectionName,
protected <T> Stream<T> doStream(Query query, Class<?> entityType, String collectionName,
Class<T> returnType) {
Assert.notNull(query, "Query must not be null!");
@@ -393,7 +394,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
Assert.hasText(collectionName, "Collection name must not be null or empty!");
Assert.notNull(returnType, "ReturnType must not be null!");
return execute(collectionName, (CollectionCallback<CloseableIterator<T>>) collection -> {
return execute(collectionName, (CollectionCallback<Stream<T>>) collection -> {
MongoPersistentEntity<?> persistentEntity = mappingContext.getPersistentEntity(entityType);
@@ -408,7 +409,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
col -> col.find(mappedQuery, Document.class).projection(mappedFields));
return new CloseableIterableCursorAdapter<>(cursor, exceptionTranslator,
new ProjectingReadCallback<>(mongoConverter, projection, collectionName));
new ProjectingReadCallback<>(mongoConverter, projection, collectionName)).stream();
});
}
@@ -1860,7 +1861,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
}
@Override
public <O> CloseableIterator<O> aggregateStream(TypedAggregation<?> aggregation, String inputCollectionName,
public <O> Stream<O> aggregateStream(TypedAggregation<?> aggregation, String inputCollectionName,
Class<O> outputType) {
Assert.notNull(aggregation, "Aggregation pipeline must not be null!");
@@ -1871,19 +1872,19 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
}
@Override
public <O> CloseableIterator<O> aggregateStream(TypedAggregation<?> aggregation, Class<O> outputType) {
public <O> Stream<O> aggregateStream(TypedAggregation<?> aggregation, Class<O> outputType) {
return aggregateStream(aggregation, getCollectionName(aggregation.getInputType()), outputType);
}
@Override
public <O> CloseableIterator<O> aggregateStream(Aggregation aggregation, Class<?> inputType, Class<O> outputType) {
public <O> Stream<O> aggregateStream(Aggregation aggregation, Class<?> inputType, Class<O> outputType) {
return aggregateStream(aggregation, getCollectionName(inputType), outputType,
new TypeBasedAggregationOperationContext(inputType, mappingContext, queryMapper));
}
@Override
public <O> CloseableIterator<O> aggregateStream(Aggregation aggregation, String collectionName, Class<O> outputType) {
public <O> Stream<O> aggregateStream(Aggregation aggregation, String collectionName, Class<O> outputType) {
return aggregateStream(aggregation, collectionName, outputType, null);
}
@@ -2021,7 +2022,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
}
@SuppressWarnings("ConstantConditions")
protected <O> CloseableIterator<O> aggregateStream(Aggregation aggregation, String collectionName,
protected <O> Stream<O> aggregateStream(Aggregation aggregation, String collectionName,
Class<O> outputType, @Nullable AggregationOperationContext context) {
Assert.hasText(collectionName, "Collection name must not be null or empty!");
@@ -2041,7 +2042,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
ReadDocumentCallback<O> readCallback = new ReadDocumentCallback<>(mongoConverter, outputType, collectionName);
return execute(collectionName, (CollectionCallback<CloseableIterator<O>>) collection -> {
return execute(collectionName, (CollectionCallback<Stream<O>>) collection -> {
AggregateIterable<Document> cursor = collection.aggregate(pipeline, Document.class) //
.allowDiskUse(options.isAllowDiskUse());
@@ -2061,7 +2062,7 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
.map(Collation::toMongoCollation) //
.ifPresent(cursor::collation);
return new CloseableIterableCursorAdapter<>(cursor, exceptionTranslator, readCallback);
return new CloseableIterableCursorAdapter<>(cursor, exceptionTranslator, readCallback).stream();
});
}
@@ -3203,6 +3204,23 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
}
}
/**
* @deprecated since 3.1.4. Use {@link #getMongoDatabaseFactory()} instead.
* @return the {@link MongoDatabaseFactory} in use.
*/
@Deprecated
public MongoDatabaseFactory getMongoDbFactory() {
return getMongoDatabaseFactory();
}
/**
* @return the {@link MongoDatabaseFactory} in use.
* @since 3.1.4
*/
public MongoDatabaseFactory getMongoDatabaseFactory() {
return mongoDbFactory;
}
/**
* A {@link CloseableIterator} that is backed by a MongoDB {@link MongoCollection}.
*
@@ -3286,23 +3304,6 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
}
}
/**
* @deprecated since 3.1.4. Use {@link #getMongoDatabaseFactory()} instead.
* @return the {@link MongoDatabaseFactory} in use.
*/
@Deprecated
public MongoDatabaseFactory getMongoDbFactory() {
return getMongoDatabaseFactory();
}
/**
* @return the {@link MongoDatabaseFactory} in use.
* @since 3.1.4
*/
public MongoDatabaseFactory getMongoDatabaseFactory() {
return mongoDbFactory;
}
/**
* {@link MongoTemplate} extension bound to a specific {@link ClientSession} that is applied when interacting with the
* server through the driver API.

View File

@@ -120,7 +120,7 @@ public class StringBasedAggregation extends AbstractMongoQuery {
if (method.isStreamQuery()) {
Stream<?> stream = mongoOperations.aggregateStream(aggregation, targetType).stream();
Stream<?> stream = mongoOperations.aggregateStream(aggregation, targetType);
if (isSimpleReturnType) {
return stream.map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter));

View File

@@ -30,7 +30,7 @@ import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.NearQuery
import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.Update
import org.springframework.data.util.CloseableIterator
import java.util.stream.Stream
import kotlin.reflect.KClass
/**
@@ -67,8 +67,8 @@ inline fun <reified T : Any> MongoOperations.execute(action: CollectionCallback<
* @author Sebastien Deleuze
* @since 2.0
*/
inline fun <reified T : Any> MongoOperations.stream(query: Query): CloseableIterator<T> =
stream(query, T::class.java)
inline fun <reified T : Any> MongoOperations.stream(query: Query): Stream<T> =
stream(query, T::class.java)
/**
* Extension for [MongoOperations.stream] leveraging reified type parameters.
@@ -76,9 +76,12 @@ inline fun <reified T : Any> MongoOperations.stream(query: Query): CloseableIter
* @author Sebastien Deleuze
* @since 2.0
*/
inline fun <reified T : Any> MongoOperations.stream(query: Query, collectionName: String? = null): CloseableIterator<T> =
if (collectionName != null) stream(query, T::class.java, collectionName)
else stream(query, T::class.java)
inline fun <reified T : Any> MongoOperations.stream(
query: Query,
collectionName: String? = null
): Stream<T> =
if (collectionName != null) stream(query, T::class.java, collectionName)
else stream(query, T::class.java)
/**
* Extension for [MongoOperations.createCollection] providing a [KClass] based variant.
@@ -264,7 +267,7 @@ inline fun <reified O : Any> MongoOperations.aggregate(
inline fun <reified O : Any> MongoOperations.aggregateStream(
aggregation: Aggregation,
inputType: KClass<*>
): CloseableIterator<O> =
): Stream<O> =
aggregateStream(aggregation, inputType.java, O::class.java)
/**
@@ -273,7 +276,7 @@ inline fun <reified O : Any> MongoOperations.aggregateStream(
* @author Mark Paluch
* @since 3.2
*/
inline fun <reified I : Any, reified O : Any> MongoOperations.aggregateStream(aggregation: Aggregation): CloseableIterator<O> =
inline fun <reified I : Any, reified O : Any> MongoOperations.aggregateStream(aggregation: Aggregation): Stream<O> =
aggregateStream(aggregation, I::class.java, O::class.java)
/**
@@ -285,7 +288,7 @@ inline fun <reified I : Any, reified O : Any> MongoOperations.aggregateStream(ag
inline fun <reified O : Any> MongoOperations.aggregateStream(
aggregation: Aggregation,
collectionName: String
): CloseableIterator<O> =
): Stream<O> =
aggregateStream(aggregation, collectionName, O::class.java)
/**

View File

@@ -40,6 +40,7 @@ import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.bson.types.ObjectId;
import org.junit.jupiter.api.AfterEach;
@@ -85,7 +86,6 @@ import org.springframework.data.mongodb.test.util.Client;
import org.springframework.data.mongodb.test.util.MongoClientExtension;
import org.springframework.data.mongodb.test.util.MongoTestTemplate;
import org.springframework.data.mongodb.test.util.MongoVersion;
import org.springframework.data.util.CloseableIterator;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
@@ -3044,10 +3044,9 @@ public class MongoTemplateTests {
Query q = new Query();
q.with(Sort.by(Direction.ASC, "age"));
CloseableIterator<Person> stream = template.stream(q, Person.class);
List<Integer> streamResults = template.stream(q, Person.class).map(Person::getAge).toList();
assertThat(stream.next().getAge()).isEqualTo(youngestPerson.getAge());
assertThat(stream.next().getAge()).isEqualTo(oldestPerson.getAge());
assertThat(streamResults).containsExactly(youngestPerson.getAge(), oldestPerson.getAge());
}
@Test // DATAMONGO-1208
@@ -3060,7 +3059,7 @@ public class MongoTemplateTests {
Query q = new Query();
q.with(PageRequest.of(0, 1, Sort.by(Direction.ASC, "age")));
CloseableIterator<Person> stream = template.stream(q, Person.class);
Iterator<Person> stream = template.stream(q, Person.class).iterator();
assertThat(stream.next().getAge()).isEqualTo(youngestPerson.getAge());
assertThat(stream.hasNext()).isFalse();
@@ -3332,11 +3331,11 @@ public class MongoTemplateTests {
template.insert(document, "some_special_collection");
CloseableIterator<Document> stream = template.stream(new Query(), Document.class);
assertThat(stream.hasNext()).isFalse();
Stream<Document> stream = template.stream(new Query(), Document.class);
assertThat(stream).isEmpty();
CloseableIterator<org.bson.Document> stream2 = template.stream(new Query(where("_id").is(document.id)),
org.bson.Document.class, "some_special_collection");
Iterator<org.bson.Document> stream2 = template
.stream(new Query(where("_id").is(document.id)), org.bson.Document.class, "some_special_collection").iterator();
assertThat(stream2.hasNext()).isTrue();
assertThat(stream2.next().get("_id")).isEqualTo(new ObjectId(document.id));

View File

@@ -803,7 +803,7 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
@Test // DATAMONGO-1518
void streamQueryShouldUseCollationWhenPresent() {
template.stream(new BasicQuery("{}").collation(Collation.of("fr")), AutogenerateableId.class).next();
template.stream(new BasicQuery("{}").collation(Collation.of("fr")), AutogenerateableId.class);
verify(findIterable).collation(eq(com.mongodb.client.model.Collation.builder().locale("fr").build()));
}
@@ -1221,7 +1221,7 @@ public class MongoTemplateUnitTests extends MongoOperationsUnitTests {
@Test // DATAMONGO-1854
void streamQueryShouldUseDefaultCollationWhenPresent() {
template.stream(new BasicQuery("{}"), Sith.class).next();
template.stream(new BasicQuery("{}"), Sith.class);
verify(findIterable).collation(eq(com.mongodb.client.model.Collation.builder().locale("de_AT").build()));
}

View File

@@ -40,6 +40,7 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Scanner;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,6 +50,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.core.io.ClassPathResource;
import org.springframework.dao.DataAccessException;
import org.springframework.data.annotation.Id;
@@ -63,7 +65,6 @@ import org.springframework.data.mongodb.core.TestEntities;
import org.springframework.data.mongodb.core.Venue;
import org.springframework.data.mongodb.core.aggregation.AggregationTests.CarDescriptor.Entry;
import org.springframework.data.mongodb.core.aggregation.BucketAutoOperation.Granularities;
import org.springframework.data.mongodb.core.aggregation.DateOperators.TemporalUnits;
import org.springframework.data.mongodb.core.aggregation.VariableOperators.Let.ExpressionVariable;
import org.springframework.data.mongodb.core.geo.GeoJsonPoint;
import org.springframework.data.mongodb.core.index.GeoSpatialIndexType;
@@ -76,7 +77,6 @@ import org.springframework.data.mongodb.test.util.MongoTemplateExtension;
import org.springframework.data.mongodb.test.util.MongoTestTemplate;
import org.springframework.data.mongodb.test.util.MongoVersion;
import org.springframework.data.mongodb.test.util.Template;
import org.springframework.data.util.CloseableIterator;
import com.mongodb.MongoException;
import com.mongodb.client.MongoCollection;
@@ -234,18 +234,17 @@ public class AggregationTests {
sort(DESC, "n") //
).withOptions(new AggregationOptions(true, false, 1));
CloseableIterator<TagCount> iterator = mongoTemplate.aggregateStream(agg, INPUT_COLLECTION, TagCount.class);
try (Stream<TagCount> stream = mongoTemplate.aggregateStream(agg, INPUT_COLLECTION, TagCount.class)) {
assertThat(iterator).isNotNull();
List<TagCount> tagCount = toList(iterator);
iterator.close();
List<TagCount> tagCount = stream.toList();
assertThat(tagCount).isNotNull();
assertThat(tagCount.size()).isEqualTo(3);
assertThat(tagCount).isNotNull();
assertThat(tagCount.size()).isEqualTo(3);
assertTagCount("spring", 3, tagCount.get(0));
assertTagCount("mongodb", 2, tagCount.get(1));
assertTagCount("nosql", 1, tagCount.get(2));
assertTagCount("spring", 3, tagCount.get(0));
assertTagCount("mongodb", 2, tagCount.get(1));
assertTagCount("nosql", 1, tagCount.get(2));
}
}
@Test // DATAMONGO-586
@@ -284,14 +283,12 @@ public class AggregationTests {
sort(DESC, "n") //
);
CloseableIterator<TagCount> results = mongoTemplate.aggregateStream(aggregation, INPUT_COLLECTION, TagCount.class);
try (Stream<TagCount> stream = mongoTemplate.aggregateStream(aggregation, INPUT_COLLECTION, TagCount.class)) {
assertThat(results).isNotNull();
List<TagCount> tagCount = stream.toList();
List<TagCount> tagCount = toList(results);
results.close();
assertThat(tagCount.size()).isEqualTo(0);
assertThat(tagCount.size()).isEqualTo(0);
}
}
@Test // DATAMONGO-1391
@@ -384,16 +381,14 @@ public class AggregationTests {
limit(2) //
);
CloseableIterator<TagCount> results = mongoTemplate.aggregateStream(aggregation, INPUT_COLLECTION, TagCount.class);
try (Stream<TagCount> stream = mongoTemplate.aggregateStream(aggregation, INPUT_COLLECTION, TagCount.class)) {
assertThat(results).isNotNull();
List<TagCount> tagCount = stream.toList();
List<TagCount> tagCount = toList(results);
results.close();
assertThat(tagCount.size()).isEqualTo(2);
assertTagCount(null, 0, tagCount.get(0));
assertTagCount(null, 0, tagCount.get(1));
assertThat(tagCount.size()).isEqualTo(2);
assertTagCount(null, 0, tagCount.get(0));
assertTagCount(null, 0, tagCount.get(1));
}
}
@Test // DATAMONGO-586
@@ -1280,19 +1275,18 @@ public class AggregationTests {
assertThat(agg).isNotNull();
assertThat(agg.toString()).isNotNull();
CloseableIterator<LikeStats> iterator = mongoTemplate.aggregateStream(agg, LikeStats.class);
List<LikeStats> result = toList(iterator);
iterator.close();
try (Stream<LikeStats> stream = mongoTemplate.aggregateStream(agg, LikeStats.class)) {
assertThat(result).isNotNull();
assertThat(result).isNotNull();
assertThat(result.size()).isEqualTo(5);
List<LikeStats> result = stream.toList();
assertLikeStats(result.get(0), "a", 4);
assertLikeStats(result.get(1), "b", 2);
assertLikeStats(result.get(2), "c", 4);
assertLikeStats(result.get(3), "d", 2);
assertLikeStats(result.get(4), "e", 3);
assertThat(result.size()).isEqualTo(5);
assertLikeStats(result.get(0), "a", 4);
assertLikeStats(result.get(1), "b", 2);
assertLikeStats(result.get(2), "c", 4);
assertLikeStats(result.get(3), "d", 2);
assertLikeStats(result.get(4), "e", 3);
}
}
@Test // DATAMONGO-960
@@ -1606,13 +1600,14 @@ public class AggregationTests {
sort(DESC, "count"), //
out(tempOutCollection));
CloseableIterator<Document> iterator = mongoTemplate.aggregateStream(agg, Document.class);
try (Stream<Document> stream = mongoTemplate.aggregateStream(agg, Document.class)) {
List<Document> result = toList(iterator);
List<Document> result = stream.toList();
assertThat(result).hasSize(2);
assertThat(result.get(0)).containsEntry("_id", "MALE").containsEntry("count", 3);
assertThat(result.get(1)).containsEntry("_id", "FEMALE").containsEntry("count", 2);
assertThat(result).hasSize(2);
assertThat(result.get(0)).containsEntry("_id", "MALE").containsEntry("count", 3);
assertThat(result.get(1)).containsEntry("_id", "FEMALE").containsEntry("count", 2);
}
mongoTemplate.dropCollection(tempOutCollection);
}
@@ -2013,16 +2008,6 @@ public class AggregationTests {
assertThat(tagCount.getN()).isEqualTo(n);
}
private static <T> List<T> toList(CloseableIterator<? extends T> results) {
List<T> result = new ArrayList<T>();
while (results.hasNext()) {
result.add(results.next());
}
return result;
}
static class DATAMONGO753 {
PD[] pd;

View File

@@ -42,7 +42,6 @@ import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.SliceImpl;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.mongodb.InvalidMongoDbApiUsageException;
@@ -66,7 +65,6 @@ import org.springframework.data.projection.SpelAwareProxyProjectionFactory;
import org.springframework.data.repository.Repository;
import org.springframework.data.repository.core.support.DefaultRepositoryMetadata;
import org.springframework.data.repository.query.QueryMethodEvaluationContextProvider;
import org.springframework.data.util.CloseableIterator;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
@@ -238,22 +236,7 @@ public class StringBasedAggregationUnitTests {
@Test // GH-3543
void aggregationWithStreamReturnType() {
when(operations.aggregateStream(any(TypedAggregation.class), any())).thenReturn(new CloseableIterator<Object>() {
@Override
public void close() {
}
@Override
public boolean hasNext() {
return false;
}
@Override
public Object next() {
return null;
}
});
when(operations.aggregateStream(any(TypedAggregation.class), any())).thenReturn(Stream.empty());
StringBasedAggregation sba = createAggregationForMethod("aggregationWithStreamReturnType", Pageable.class);