Compare commits

...

12 Commits

Author SHA1 Message Date
Mark Paluch
7dba98dce8 DATAMONGO-2402 - Release version 2.2.2 (Moore SR2). 2019-11-18 12:32:12 +01:00
Mark Paluch
c1ae30bd82 DATAMONGO-2402 - Prepare 2.2.2 (Moore SR2). 2019-11-18 12:31:53 +01:00
Mark Paluch
6aa5aea424 DATAMONGO-2402 - Updated changelog. 2019-11-18 12:31:44 +01:00
Mark Paluch
bc1b00813c DATAMONGO-2401 - Updated changelog. 2019-11-18 12:16:28 +01:00
Mark Paluch
d1ad3ab301 DATAMONGO-2414 - Polishing.
Use longer timeout to cater for slower CI environments.

Original Pull Request: #807
2019-11-14 11:54:43 +01:00
Mark Paluch
923134bbdc DATAMONGO-2414 - Guard drain loop in AsyncInputStreamHandler with state switch.
We now use a non-blocking state switch to determine whether to invoke drainLoop(…) from Subscriber completion.

Previously, we relied on same thread identification assuming if the subscription thread and the completion thread were the same, that we're already running inside the drain loop.
It turns out that a I/O thread could also run in event-loop mode where subscription and completion happens on the same thread but in between there's some processing and so the the call to completion is a delayed signal and not being called on the same stack as drainLoop(…).
The same-thread assumption was in place to avoid StackOverflow caused by infinite recursions.

We now use a state lock to enter the drain loop. Any concurrent attempts to re-enter the drain loop in Subscriber completion is now prevented to make sure that we continue draining while not causing stack recursions.

Original Pull Request: #807
2019-11-14 11:54:30 +01:00
Mark Paluch
e211f69df5 DATAMONGO-2409 - Polishing.
Adapt also ExecutableFindOperation.DistinctWithProjection.asType() to return the appropriate TerminatingDistinct.

Original pull request: #805.
2019-11-11 10:22:37 +01:00
Christoph Strobl
fc35d706a0 DATAMONGO-2409 - Fix return type of Kotlin extension function for ReactiveFindOperation.DistinctWithProjection.asType().
Original pull request: #805.
2019-11-11 10:22:37 +01:00
Mark Paluch
82894e6aff DATAMONGO-2403 - Polishing.
Use handle(…) to skip values instead of flatMap(…) to reduce overhead.

Original pull request: #804.
2019-11-08 13:51:20 +01:00
Christoph Strobl
7356f157bb DATAMONGO-2403 - Fix aggregation simple type result retrieval from empty document.
Projections used within an aggregation pipeline can result in empty documents emitted by the driver. We now guarded those cases and skip those documents within a Flux or simply return an empty Mono depending on the methods signature.

Original pull request: #804.
2019-11-08 13:48:10 +01:00
Christoph Strobl
783fc6268a DATAMONGO-2382 - After release cleanups. 2019-11-04 15:34:27 +01:00
Christoph Strobl
360b17f299 DATAMONGO-2382 - Prepare next development iteration. 2019-11-04 15:34:26 +01:00
13 changed files with 140 additions and 47 deletions

View File

@@ -5,7 +5,7 @@
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.1.RELEASE</version>
<version>2.2.2.RELEASE</version>
<packaging>pom</packaging>
<name>Spring Data MongoDB</name>
@@ -15,7 +15,7 @@
<parent>
<groupId>org.springframework.data.build</groupId>
<artifactId>spring-data-parent</artifactId>
<version>2.2.1.RELEASE</version>
<version>2.2.2.RELEASE</version>
</parent>
<modules>
@@ -26,7 +26,7 @@
<properties>
<project.type>multi</project.type>
<dist.id>spring-data-mongodb</dist.id>
<springdata.commons>2.2.1.RELEASE</springdata.commons>
<springdata.commons>2.2.2.RELEASE</springdata.commons>
<mongo>3.11.1</mongo>
<mongo.reactivestreams>1.12.0</mongo.reactivestreams>
<jmh.version>1.19</jmh.version>

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.1.RELEASE</version>
<version>2.2.2.RELEASE</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -14,7 +14,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.1.RELEASE</version>
<version>2.2.2.RELEASE</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.1.RELEASE</version>
<version>2.2.2.RELEASE</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -62,7 +62,7 @@ class DataBufferPublisherAdapter {
/**
* Use an {@link AsyncInputStreamHandler} to read data from the given {@link AsyncInputStream}.
*
*
* @param inputStream the source stream.
* @return a {@link Flux} emitting data chunks one by one.
* @since 2.2.1
@@ -71,7 +71,6 @@ class DataBufferPublisherAdapter {
AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler(inputStream, inputStream.dataBufferFactory,
inputStream.bufferSize);
return Flux.create((sink) -> {
sink.onDispose(streamHandler::close);
@@ -87,7 +86,7 @@ class DataBufferPublisherAdapter {
* An {@link AsyncInputStream} also holding a {@link DataBufferFactory} and default {@literal bufferSize} for reading
* from it, delegating operations on the {@link AsyncInputStream} to the reference instance. <br />
* Used to pass on the {@link AsyncInputStream} and parameters to avoid capturing lambdas.
*
*
* @author Christoph Strobl
* @since 2.2.1
*/
@@ -146,12 +145,18 @@ class DataBufferPublisherAdapter {
private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> STATE = AtomicIntegerFieldUpdater
.newUpdater(AsyncInputStreamHandler.class, "state");
private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> DRAIN = AtomicIntegerFieldUpdater
.newUpdater(AsyncInputStreamHandler.class, "drain");
private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> READ = AtomicIntegerFieldUpdater
.newUpdater(AsyncInputStreamHandler.class, "read");
private static final int STATE_OPEN = 0;
private static final int STATE_CLOSED = 1;
private static final int DRAIN_NONE = 0;
private static final int DRAIN_COMPLETION = 1;
private static final int READ_NONE = 0;
private static final int READ_IN_PROGRESS = 1;
@@ -165,6 +170,9 @@ class DataBufferPublisherAdapter {
// see STATE
volatile int state = STATE_OPEN;
// see DRAIN
volatile int drain = DRAIN_NONE;
// see READ_IN_PROGRESS
volatile int read = READ_NONE;
@@ -209,6 +217,14 @@ class DataBufferPublisherAdapter {
STATE.compareAndSet(this, STATE_OPEN, STATE_CLOSED);
}
boolean enterDrainLoop() {
return DRAIN.compareAndSet(this, DRAIN_NONE, DRAIN_COMPLETION);
}
void leaveDrainLoop() {
DRAIN.set(this, DRAIN_NONE);
}
boolean isClosed() {
return STATE.get(this) == STATE_CLOSED;
}
@@ -235,7 +251,6 @@ class DataBufferPublisherAdapter {
private final FluxSink<DataBuffer> sink;
private final DataBufferFactory factory;
private final ByteBuffer transport;
private final Thread subscribeThread = Thread.currentThread();
private volatile Subscription subscription;
BufferCoreSubscriber(FluxSink<DataBuffer> sink, DataBufferFactory factory, ByteBuffer transport) {
@@ -261,8 +276,6 @@ class DataBufferPublisherAdapter {
public void onNext(Integer bytes) {
if (isClosed()) {
onReadDone();
return;
}
@@ -273,13 +286,9 @@ class DataBufferPublisherAdapter {
decrementDemand();
}
try {
if (bytes == -1) {
sink.complete();
return;
}
} finally {
onReadDone();
if (bytes == -1) {
sink.complete();
return;
}
subscription.request(1);
@@ -306,15 +315,25 @@ class DataBufferPublisherAdapter {
return;
}
onReadDone();
close();
sink.error(t);
}
@Override
public void onComplete() {
if (subscribeThread != Thread.currentThread()) {
drainLoop(sink);
onReadDone();
if (!isClosed()) {
if (enterDrainLoop()) {
try {
drainLoop(sink);
} finally {
leaveDrainLoop();
}
}
}
}
}

View File

@@ -36,6 +36,7 @@ import org.springframework.data.repository.query.ParameterAccessor;
import org.springframework.data.repository.query.QueryMethodEvaluationContextProvider;
import org.springframework.data.repository.query.RepositoryQuery;
import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.util.TypeInformation;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@@ -117,13 +118,18 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
private Object execute(MongoParameterAccessor parameterAccessor) {
ConvertingParameterAccessor convertingParamterAccessor = new ConvertingParameterAccessor(operations.getConverter(),
ConvertingParameterAccessor accessor = new ConvertingParameterAccessor(operations.getConverter(),
parameterAccessor);
ResultProcessor processor = method.getResultProcessor().withDynamicProjection(convertingParamterAccessor);
TypeInformation<?> returnType = method.getReturnType();
ResultProcessor processor = method.getResultProcessor().withDynamicProjection(accessor);
Class<?> typeToRead = processor.getReturnedType().getTypeToRead();
return doExecute(method, processor, convertingParamterAccessor, typeToRead);
if (typeToRead == null && returnType.getComponentType() != null) {
typeToRead = returnType.getComponentType().getType();
}
return doExecute(method, processor, accessor, typeToRead);
}
/**

View File

@@ -95,7 +95,14 @@ public class ReactiveStringBasedAggregation extends AbstractReactiveMongoQuery {
Flux<?> flux = reactiveMongoOperations.aggregate(aggregation, targetType);
if (isSimpleReturnType && !isRawReturnType) {
flux = flux.map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter));
flux = flux.handle((it, sink) -> {
Object result = AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter);
if (result != null) {
sink.next(result);
}
});
}
if (method.isCollectionQuery()) {

View File

@@ -73,7 +73,8 @@ fun <T : Any> ExecutableFindOperation.DistinctWithProjection.asType(resultType:
* Extension for [ExecutableFindOperation.DistinctWithProjection.as] leveraging reified type parameters.
*
* @author Christoph Strobl
* @author Mark Paluch
* @since 2.1
*/
inline fun <reified T : Any> ExecutableFindOperation.DistinctWithProjection.asType(): ExecutableFindOperation.DistinctWithProjection =
inline fun <reified T : Any> ExecutableFindOperation.DistinctWithProjection.asType(): ExecutableFindOperation.TerminatingDistinct<T> =
`as`(T::class.java)

View File

@@ -76,7 +76,7 @@ fun <T : Any> ReactiveFindOperation.DistinctWithProjection.asType(resultType: KC
* @author Christoph Strobl
* @since 2.1
*/
inline fun <reified T : Any> ReactiveFindOperation.DistinctWithProjection.asType(): ReactiveFindOperation.DistinctWithProjection =
inline fun <reified T : Any> ReactiveFindOperation.DistinctWithProjection.asType(): ReactiveFindOperation.TerminatingDistinct<T> =
`as`(T::class.java)
/**

View File

@@ -31,21 +31,18 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.dao.DataAccessException;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.SimpleMongoClientDbFactory;
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.mongodb.test.util.MongoTestUtils;
import org.springframework.data.mongodb.test.util.ReplicaSet;
import org.springframework.test.annotation.IfProfileValue;
import org.springframework.util.ErrorHandler;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
@@ -61,6 +58,8 @@ public class DefaultMessageListenerContainerTests {
public static final String COLLECTION_NAME = "collection-1";
public static final String COLLECTION_2_NAME = "collection-2";
public static final Duration TIMEOUT = Duration.ofSeconds(2);
public @Rule TestRule replSet = ReplicaSet.none();
MongoDbFactory dbFactory;
@@ -94,12 +93,12 @@ public class DefaultMessageListenerContainerTests {
Person.class);
container.start();
awaitSubscription(subscription, Duration.ofMillis(500));
awaitSubscription(subscription, TIMEOUT);
collection.insertOne(new Document("_id", "id-1").append("firstname", "foo"));
collection.insertOne(new Document("_id", "id-2").append("firstname", "bar"));
awaitMessages(messageListener, 2, Duration.ofMillis(500));
awaitMessages(messageListener, 2, TIMEOUT);
assertThat(messageListener.getMessages().stream().map(Message::getBody).collect(Collectors.toList()))
.containsExactly(new Person("id-1", "foo"), new Person("id-2", "bar"));
@@ -125,12 +124,12 @@ public class DefaultMessageListenerContainerTests {
}, () -> COLLECTION_NAME), Person.class, errorHandler);
container.start();
awaitSubscription(subscription, Duration.ofMillis(500));
awaitSubscription(subscription, TIMEOUT);
collection.insertOne(new Document("_id", "id-1").append("firstname", "foo"));
collection.insertOne(new Document("_id", "id-2").append("firstname", "bar"));
awaitMessages(messageListener, 2, Duration.ofMillis(500));
awaitMessages(messageListener, 2, TIMEOUT);
verify(errorHandler, atLeast(1)).handleError(any(IllegalStateException.class));
assertThat(messageListener.getTotalNumberMessagesReceived()).isEqualTo(2);
@@ -145,12 +144,12 @@ public class DefaultMessageListenerContainerTests {
Document.class);
container.start();
awaitSubscription(subscription, Duration.ofMillis(500));
awaitSubscription(subscription, TIMEOUT);
collection.insertOne(new Document("_id", "id-1").append("value", "foo"));
collection.insertOne(new Document("_id", "id-2").append("value", "bar"));
awaitMessages(messageListener, 2, Duration.ofMillis(500));
awaitMessages(messageListener, 2, TIMEOUT);
container.stop();
@@ -174,12 +173,12 @@ public class DefaultMessageListenerContainerTests {
Subscription subscription = container.register(new ChangeStreamRequest(messageListener, () -> COLLECTION_NAME),
Document.class);
awaitSubscription(subscription, Duration.ofMillis(500));
awaitSubscription(subscription, TIMEOUT);
Document expected = new Document("_id", "id-2").append("value", "bar");
collection.insertOne(expected);
awaitMessages(messageListener, 1, Duration.ofMillis(500));
awaitMessages(messageListener, 1, TIMEOUT);
container.stop();
assertThat(messageListener.getMessages().stream().map(Message::getBody).collect(Collectors.toList()))
@@ -226,11 +225,11 @@ public class DefaultMessageListenerContainerTests {
awaitSubscription(
container.register(new TailableCursorRequest(messageListener, () -> COLLECTION_NAME), Document.class),
Duration.ofMillis(500));
TIMEOUT);
collection.insertOne(new Document("_id", "id-2").append("value", "bar"));
awaitMessages(messageListener, 2, Duration.ofSeconds(2));
awaitMessages(messageListener, 2, TIMEOUT);
container.stop();
assertThat(messageListener.getTotalNumberMessagesReceived()).isEqualTo(2);
@@ -247,12 +246,12 @@ public class DefaultMessageListenerContainerTests {
awaitSubscription(
container.register(new TailableCursorRequest(messageListener, () -> COLLECTION_NAME), Document.class),
Duration.ofMillis(500));
TIMEOUT);
collection.insertOne(new Document("_id", "id-1").append("value", "foo"));
collection.insertOne(new Document("_id", "id-2").append("value", "bar"));
awaitMessages(messageListener, 2, Duration.ofSeconds(2));
awaitMessages(messageListener, 2, TIMEOUT);
container.stop();
assertThat(messageListener.getTotalNumberMessagesReceived()).isEqualTo(2);
@@ -359,7 +358,7 @@ public class DefaultMessageListenerContainerTests {
container.start();
awaitSubscription(subscription, Duration.ofMillis(500));
awaitSubscription(subscription, TIMEOUT);
collection.insertOne(new Document("_id", "col-1-id-1").append("firstname", "foo"));
collection.insertOne(new Document("_id", "col-1-id-2").append("firstname", "bar"));
@@ -367,7 +366,7 @@ public class DefaultMessageListenerContainerTests {
collection2.insertOne(new Document("_id", "col-2-id-1").append("firstname", "bar"));
collection2.insertOne(new Document("_id", "col-2-id-2").append("firstname", "foo"));
awaitMessages(messageListener, 4, Duration.ofMillis(500));
awaitMessages(messageListener, 4, TIMEOUT);
assertThat(messageListener.getMessages().stream().map(Message::getBody).collect(Collectors.toList()))
.containsExactly(new Person("col-1-id-1", "foo"), new Person("col-1-id-2", "bar"),

View File

@@ -27,6 +27,7 @@ import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
@@ -36,7 +37,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@@ -535,6 +535,46 @@ public class ReactiveMongoRepositoryTests {
}).verifyComplete();
}
@Test // DATAMONGO-2153
public void annotatedAggregationWithAggregationResultAsMap() {
repository.sumAgeAndReturnSumAsMap() //
.as(StepVerifier::create) //
.consumeNextWith(it -> {
assertThat(it).isInstanceOf(Map.class);
}).verifyComplete();
}
@Test // DATAMONGO-2403
public void annotatedAggregationExtractingSimpleValueIsEmptyForEmptyDocument() {
Person p = new Person("project-on-lastanme", null);
repository.save(p).then().as(StepVerifier::create).verifyComplete();
repository.projectToLastnameAndRemoveId(p.getFirstname()) //
.as(StepVerifier::create) //
.verifyComplete();
}
@Test // DATAMONGO-2403
public void annotatedAggregationSkipsEmptyDocumentsWhenExtractingSimpleValue() {
String firstname = "project-on-lastanme";
Person p1 = new Person(firstname, null);
p1.setEmail("p1@example.com");
Person p2 = new Person(firstname, "lastname");
p2.setEmail("p2@example.com");
Person p3 = new Person(firstname, null);
p3.setEmail("p3@example.com");
repository.saveAll(Arrays.asList(p1, p2, p3)).then().as(StepVerifier::create).verifyComplete();
repository.projectToLastnameAndRemoveId(firstname) //
.as(StepVerifier::create) //
.expectNext("lastname").verifyComplete();
}
interface ReactivePersonRepository
extends ReactiveMongoRepository<Person, String>, ReactiveQuerydslPredicateExecutor<Person> {
@@ -596,6 +636,13 @@ public class ReactiveMongoRepositoryTests {
@Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }")
Mono<SumAge> sumAgeAndReturnSumWrapper();
@Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }")
Mono<Map> sumAgeAndReturnSumAsMap();
@Aggregation(
pipeline = { "{ '$match' : { 'firstname' : '?0' } }", "{ '$project' : { '_id' : 0, 'lastname' : 1 } }" })
Mono<String> projectToLastnameAndRemoveId(String firstname);
@Query(value = "{_id:?0}")
Mono<org.bson.Document> findDocumentById(String id);
}

View File

@@ -1,6 +1,20 @@
Spring Data MongoDB Changelog
=============================
Changes in version 2.2.2.RELEASE (2019-11-18)
---------------------------------------------
* DATAMONGO-2414 - ReactiveGridFsResource.getDownloadStream(…) hang if completion happens on event loop.
* DATAMONGO-2409 - Extension Function ReactiveFindOperation.DistinctWithProjection.asType() has wrong return type.
* DATAMONGO-2403 - ReactiveStringBasedAggregation / AggregationUtils fails on NPE because source or value is null.
* DATAMONGO-2402 - Release 2.2.2 (Moore SR2).
Changes in version 2.1.13.RELEASE (2019-11-18)
----------------------------------------------
* DATAMONGO-2409 - Extension Function ReactiveFindOperation.DistinctWithProjection.asType() has wrong return type.
* DATAMONGO-2401 - Release 2.1.13 (Lovelace SR13).
Changes in version 2.2.1.RELEASE (2019-11-04)
---------------------------------------------
* DATAMONGO-2399 - Upgrade to mongo-java-driver 3.11.1.

View File

@@ -1,4 +1,4 @@
Spring Data MongoDB 2.2.1
Spring Data MongoDB 2.2.2
Copyright (c) [2010-2019] Pivotal Software, Inc.
This product is licensed to you under the Apache License, Version 2.0 (the "License").