Compare commits
32 Commits
3.4.10
...
2.2.2.RELE
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7dba98dce8 | ||
|
|
c1ae30bd82 | ||
|
|
6aa5aea424 | ||
|
|
bc1b00813c | ||
|
|
d1ad3ab301 | ||
|
|
923134bbdc | ||
|
|
e211f69df5 | ||
|
|
fc35d706a0 | ||
|
|
82894e6aff | ||
|
|
7356f157bb | ||
|
|
783fc6268a | ||
|
|
360b17f299 | ||
|
|
2cfcdaff7c | ||
|
|
9d9cf46e47 | ||
|
|
98661cf9a2 | ||
|
|
cc50cd5e3a | ||
|
|
d8399d2d23 | ||
|
|
f2134fb2f8 | ||
|
|
ec3ccc004e | ||
|
|
6cb246c18a | ||
|
|
e73cea0ecf | ||
|
|
c69e185a2a | ||
|
|
5789f59222 | ||
|
|
5178eeb340 | ||
|
|
bc5e7fa4a2 | ||
|
|
c28ace6d40 | ||
|
|
de4fae37e1 | ||
|
|
2f1aff3ec3 | ||
|
|
6970f934bd | ||
|
|
6b5168e102 | ||
|
|
4420edb4dc | ||
|
|
c2fae95fee |
10
Jenkinsfile
vendored
10
Jenkinsfile
vendored
@@ -3,7 +3,7 @@ pipeline {
|
||||
|
||||
triggers {
|
||||
pollSCM 'H/10 * * * *'
|
||||
upstream(upstreamProjects: "spring-data-commons/master", threshold: hudson.model.Result.SUCCESS)
|
||||
upstream(upstreamProjects: "spring-data-commons/2.2.x", threshold: hudson.model.Result.SUCCESS)
|
||||
}
|
||||
|
||||
options {
|
||||
@@ -68,7 +68,7 @@ pipeline {
|
||||
stage("test: baseline") {
|
||||
when {
|
||||
anyOf {
|
||||
branch 'master'
|
||||
branch '2.2.x'
|
||||
not { triggeredBy 'UpstreamCause' }
|
||||
}
|
||||
}
|
||||
@@ -94,7 +94,7 @@ pipeline {
|
||||
stage("Test other configurations") {
|
||||
when {
|
||||
anyOf {
|
||||
branch 'master'
|
||||
branch '2.2.x'
|
||||
not { triggeredBy 'UpstreamCause' }
|
||||
}
|
||||
}
|
||||
@@ -143,7 +143,7 @@ pipeline {
|
||||
stage('Release to artifactory') {
|
||||
when {
|
||||
anyOf {
|
||||
branch 'master'
|
||||
branch '2.2.x'
|
||||
not { triggeredBy 'UpstreamCause' }
|
||||
}
|
||||
}
|
||||
@@ -175,7 +175,7 @@ pipeline {
|
||||
|
||||
stage('Publish documentation') {
|
||||
when {
|
||||
branch 'master'
|
||||
branch '2.2.x'
|
||||
}
|
||||
agent {
|
||||
docker {
|
||||
|
||||
8
pom.xml
8
pom.xml
@@ -5,7 +5,7 @@
|
||||
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-mongodb-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>2.2.2.RELEASE</version>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<name>Spring Data MongoDB</name>
|
||||
@@ -15,7 +15,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.data.build</groupId>
|
||||
<artifactId>spring-data-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>2.2.2.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<modules>
|
||||
@@ -26,8 +26,8 @@
|
||||
<properties>
|
||||
<project.type>multi</project.type>
|
||||
<dist.id>spring-data-mongodb</dist.id>
|
||||
<springdata.commons>2.2.0.RELEASE</springdata.commons>
|
||||
<mongo>3.11.0</mongo>
|
||||
<springdata.commons>2.2.2.RELEASE</springdata.commons>
|
||||
<mongo>3.11.1</mongo>
|
||||
<mongo.reactivestreams>1.12.0</mongo.reactivestreams>
|
||||
<jmh.version>1.19</jmh.version>
|
||||
</properties>
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-mongodb-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>2.2.2.RELEASE</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-mongodb-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>2.2.2.RELEASE</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-mongodb-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>2.2.2.RELEASE</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.springframework.data.mongodb.util.BsonUtils;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.NumberUtils;
|
||||
@@ -117,9 +118,8 @@ public class IndexInfo {
|
||||
boolean sparse = sourceDocument.containsKey("sparse") ? (Boolean) sourceDocument.get("sparse") : false;
|
||||
String language = sourceDocument.containsKey("default_language") ? (String) sourceDocument.get("default_language")
|
||||
: "";
|
||||
String partialFilter = sourceDocument.containsKey("partialFilterExpression")
|
||||
? ((Document) sourceDocument.get("partialFilterExpression")).toJson()
|
||||
: null;
|
||||
|
||||
String partialFilter = extractPartialFilterString(sourceDocument);
|
||||
|
||||
IndexInfo info = new IndexInfo(indexFields, name, unique, sparse, language);
|
||||
info.partialFilterExpression = partialFilter;
|
||||
@@ -134,6 +134,21 @@ public class IndexInfo {
|
||||
return info;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param sourceDocument
|
||||
* @return the {@link String} representation of the partial filter {@link Document}.
|
||||
* @since 2.1.11
|
||||
*/
|
||||
@Nullable
|
||||
private static String extractPartialFilterString(Document sourceDocument) {
|
||||
|
||||
if (!sourceDocument.containsKey("partialFilterExpression")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return BsonUtils.toJson(sourceDocument.get("partialFilterExpression", Document.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the individual index fields of the index.
|
||||
*
|
||||
|
||||
@@ -29,7 +29,7 @@ import org.springframework.data.geo.Metrics;
|
||||
* @author Mark Paluch
|
||||
* @since 2.2
|
||||
*/
|
||||
class MetricConversion {
|
||||
public class MetricConversion {
|
||||
|
||||
private static final BigDecimal METERS_MULTIPLIER = new BigDecimal(Metrics.KILOMETERS.getMultiplier())
|
||||
.multiply(new BigDecimal(1000));
|
||||
@@ -43,7 +43,7 @@ class MetricConversion {
|
||||
* @param metric
|
||||
* @return
|
||||
*/
|
||||
protected static double getMetersToMetricMultiplier(Metric metric) {
|
||||
public static double getMetersToMetricMultiplier(Metric metric) {
|
||||
|
||||
ConversionMultiplier conversionMultiplier = ConversionMultiplier.builder().from(METERS_MULTIPLIER).to(metric)
|
||||
.build();
|
||||
@@ -56,7 +56,7 @@ class MetricConversion {
|
||||
* @param distance
|
||||
* @return
|
||||
*/
|
||||
protected static double getDistanceInMeters(Distance distance) {
|
||||
public static double getDistanceInMeters(Distance distance) {
|
||||
return new BigDecimal(distance.getValue()).multiply(getMetricToMetersMultiplier(distance.getMetric()))
|
||||
.doubleValue();
|
||||
}
|
||||
|
||||
@@ -17,6 +17,8 @@ package org.springframework.data.mongodb.gridfs;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import reactor.core.CoreSubscriber;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Operators;
|
||||
import reactor.util.concurrent.Queues;
|
||||
@@ -26,13 +28,12 @@ import java.nio.ByteBuffer;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.reactivestreams.Subscription;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
|
||||
import com.mongodb.reactivestreams.client.Success;
|
||||
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
|
||||
@@ -66,14 +67,14 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
|
||||
private final Publisher<? extends DataBuffer> buffers;
|
||||
private final Context subscriberContext;
|
||||
private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
|
||||
|
||||
private volatile Subscription subscription;
|
||||
private volatile boolean cancelled;
|
||||
private volatile boolean complete;
|
||||
private volatile boolean allDataBuffersReceived;
|
||||
private volatile Throwable error;
|
||||
private final Queue<BiConsumer<DataBuffer, Integer>> readRequests = Queues.<BiConsumer<DataBuffer, Integer>> small()
|
||||
.get();
|
||||
private final Queue<ReadRequest> readRequests = Queues.<ReadRequest> small().get();
|
||||
|
||||
private final Queue<DataBuffer> bufferQueue = Queues.<DataBuffer> small().get();
|
||||
|
||||
// see DEMAND
|
||||
volatile long demand;
|
||||
@@ -88,41 +89,30 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
@Override
|
||||
public Publisher<Integer> read(ByteBuffer dst) {
|
||||
|
||||
return Mono.create(sink -> {
|
||||
return Flux.create(sink -> {
|
||||
|
||||
readRequests.offer((db, bytecount) -> {
|
||||
readRequests.offer(new ReadRequest(sink, dst));
|
||||
|
||||
try {
|
||||
|
||||
if (error != null) {
|
||||
|
||||
sink.error(error);
|
||||
return;
|
||||
}
|
||||
|
||||
if (bytecount == -1) {
|
||||
|
||||
sink.success(-1);
|
||||
return;
|
||||
}
|
||||
|
||||
ByteBuffer byteBuffer = db.asByteBuffer();
|
||||
int toWrite = byteBuffer.remaining();
|
||||
|
||||
dst.put(byteBuffer);
|
||||
sink.success(toWrite);
|
||||
|
||||
} catch (Exception e) {
|
||||
sink.error(e);
|
||||
} finally {
|
||||
DataBufferUtils.release(db);
|
||||
}
|
||||
});
|
||||
|
||||
request(1);
|
||||
sink.onCancel(this::terminatePendingReads);
|
||||
sink.onDispose(this::terminatePendingReads);
|
||||
sink.onRequest(this::request);
|
||||
});
|
||||
}
|
||||
|
||||
void onError(FluxSink<Integer> sink, Throwable e) {
|
||||
|
||||
readRequests.poll();
|
||||
sink.error(e);
|
||||
}
|
||||
|
||||
void onComplete(FluxSink<Integer> sink, int writtenBytes) {
|
||||
|
||||
readRequests.poll();
|
||||
DEMAND.decrementAndGet(this);
|
||||
sink.next(writtenBytes);
|
||||
sink.complete();
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long)
|
||||
@@ -144,17 +134,19 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
cancelled = true;
|
||||
|
||||
if (error != null) {
|
||||
terminatePendingReads();
|
||||
sink.error(error);
|
||||
return;
|
||||
}
|
||||
|
||||
terminatePendingReads();
|
||||
sink.success(Success.SUCCESS);
|
||||
});
|
||||
}
|
||||
|
||||
protected void request(int n) {
|
||||
protected void request(long n) {
|
||||
|
||||
if (complete) {
|
||||
if (allDataBuffersReceived && bufferQueue.isEmpty()) {
|
||||
|
||||
terminatePendingReads();
|
||||
return;
|
||||
@@ -176,18 +168,51 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
requestFromSubscription(subscription);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void requestFromSubscription(Subscription subscription) {
|
||||
|
||||
long demand = DEMAND.get(AsyncInputStreamAdapter.this);
|
||||
|
||||
if (cancelled) {
|
||||
subscription.cancel();
|
||||
}
|
||||
|
||||
if (demand > 0 && DEMAND.compareAndSet(AsyncInputStreamAdapter.this, demand, demand - 1)) {
|
||||
subscription.request(1);
|
||||
drainLoop();
|
||||
}
|
||||
|
||||
void drainLoop() {
|
||||
|
||||
while (DEMAND.get(AsyncInputStreamAdapter.this) > 0) {
|
||||
|
||||
DataBuffer wip = bufferQueue.peek();
|
||||
|
||||
if (wip == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (wip.readableByteCount() == 0) {
|
||||
bufferQueue.poll();
|
||||
continue;
|
||||
}
|
||||
|
||||
ReadRequest consumer = AsyncInputStreamAdapter.this.readRequests.peek();
|
||||
if (consumer == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
consumer.transferBytes(wip, wip.readableByteCount());
|
||||
}
|
||||
|
||||
if (bufferQueue.isEmpty()) {
|
||||
|
||||
if (allDataBuffersReceived) {
|
||||
terminatePendingReads();
|
||||
return;
|
||||
}
|
||||
|
||||
if (demand > 0) {
|
||||
subscription.request(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,10 +221,10 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
*/
|
||||
void terminatePendingReads() {
|
||||
|
||||
BiConsumer<DataBuffer, Integer> readers;
|
||||
ReadRequest readers;
|
||||
|
||||
while ((readers = readRequests.poll()) != null) {
|
||||
readers.accept(factory.wrap(new byte[0]), -1);
|
||||
readers.onComplete();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,23 +239,21 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
public void onSubscribe(Subscription s) {
|
||||
|
||||
AsyncInputStreamAdapter.this.subscription = s;
|
||||
|
||||
Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1);
|
||||
s.request(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(DataBuffer dataBuffer) {
|
||||
|
||||
if (cancelled || complete) {
|
||||
if (cancelled || allDataBuffersReceived) {
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
|
||||
return;
|
||||
}
|
||||
|
||||
BiConsumer<DataBuffer, Integer> poll = AsyncInputStreamAdapter.this.readRequests.poll();
|
||||
ReadRequest readRequest = AsyncInputStreamAdapter.this.readRequests.peek();
|
||||
|
||||
if (poll == null) {
|
||||
if (readRequest == null) {
|
||||
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
|
||||
@@ -238,29 +261,103 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
|
||||
return;
|
||||
}
|
||||
|
||||
poll.accept(dataBuffer, dataBuffer.readableByteCount());
|
||||
bufferQueue.offer(dataBuffer);
|
||||
|
||||
requestFromSubscription(subscription);
|
||||
drainLoop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
|
||||
if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.complete) {
|
||||
if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.allDataBuffersReceived) {
|
||||
Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext);
|
||||
return;
|
||||
}
|
||||
|
||||
AsyncInputStreamAdapter.this.error = t;
|
||||
AsyncInputStreamAdapter.this.complete = true;
|
||||
AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
|
||||
terminatePendingReads();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
|
||||
AsyncInputStreamAdapter.this.complete = true;
|
||||
terminatePendingReads();
|
||||
AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
|
||||
if (bufferQueue.isEmpty()) {
|
||||
terminatePendingReads();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Request to read bytes and transfer these to the associated {@link ByteBuffer}.
|
||||
*/
|
||||
class ReadRequest {
|
||||
|
||||
private final FluxSink<Integer> sink;
|
||||
private final ByteBuffer dst;
|
||||
|
||||
private int writtenBytes;
|
||||
|
||||
ReadRequest(FluxSink<Integer> sink, ByteBuffer dst) {
|
||||
this.sink = sink;
|
||||
this.dst = dst;
|
||||
this.writtenBytes = -1;
|
||||
}
|
||||
|
||||
public void onComplete() {
|
||||
|
||||
if (error != null) {
|
||||
AsyncInputStreamAdapter.this.onError(sink, error);
|
||||
return;
|
||||
}
|
||||
|
||||
AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes);
|
||||
}
|
||||
|
||||
public void transferBytes(DataBuffer db, int bytes) {
|
||||
|
||||
try {
|
||||
|
||||
if (error != null) {
|
||||
AsyncInputStreamAdapter.this.onError(sink, error);
|
||||
return;
|
||||
}
|
||||
|
||||
ByteBuffer byteBuffer = db.asByteBuffer();
|
||||
int remaining = byteBuffer.remaining();
|
||||
int writeCapacity = Math.min(dst.remaining(), remaining);
|
||||
int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity());
|
||||
int toWrite = limit - byteBuffer.position();
|
||||
|
||||
if (toWrite == 0) {
|
||||
|
||||
AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes);
|
||||
return;
|
||||
}
|
||||
|
||||
int oldPosition = byteBuffer.position();
|
||||
|
||||
byteBuffer.limit(toWrite);
|
||||
dst.put(byteBuffer);
|
||||
byteBuffer.limit(byteBuffer.capacity());
|
||||
byteBuffer.position(oldPosition);
|
||||
db.readPosition(db.readPosition() + toWrite);
|
||||
|
||||
if (writtenBytes == -1) {
|
||||
writtenBytes = bytes;
|
||||
} else {
|
||||
writtenBytes += bytes;
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
AsyncInputStreamAdapter.this.onError(sink, e);
|
||||
} finally {
|
||||
|
||||
if (db.readableByteCount() == 0) {
|
||||
DataBufferUtils.release(db);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,12 +42,14 @@ class BinaryStreamAdapters {
|
||||
*
|
||||
* @param inputStream must not be {@literal null}.
|
||||
* @param dataBufferFactory must not be {@literal null}.
|
||||
* @param bufferSize read {@code n} bytes per iteration.
|
||||
* @return {@link Flux} emitting {@link DataBuffer}s.
|
||||
* @see DataBufferFactory#allocateBuffer()
|
||||
*/
|
||||
static Flux<DataBuffer> toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
|
||||
static Flux<DataBuffer> toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory,
|
||||
int bufferSize) {
|
||||
|
||||
return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) //
|
||||
return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory, bufferSize) //
|
||||
.filter(it -> {
|
||||
|
||||
if (it.readableByteCount() == 0) {
|
||||
|
||||
@@ -29,11 +29,10 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.reactivestreams.Subscription;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
|
||||
import com.mongodb.reactivestreams.client.Success;
|
||||
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
|
||||
|
||||
/**
|
||||
@@ -51,47 +50,119 @@ class DataBufferPublisherAdapter {
|
||||
*
|
||||
* @param inputStream must not be {@literal null}.
|
||||
* @param dataBufferFactory must not be {@literal null}.
|
||||
* @param bufferSize read {@code n} bytes per iteration.
|
||||
* @return the resulting {@link Publisher}.
|
||||
*/
|
||||
static Flux<DataBuffer> createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
|
||||
static Flux<DataBuffer> createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory,
|
||||
int bufferSize) {
|
||||
|
||||
State state = new State(inputStream, dataBufferFactory);
|
||||
return Flux.usingWhen(Mono.just(new DelegatingAsyncInputStream(inputStream, dataBufferFactory, bufferSize)),
|
||||
DataBufferPublisherAdapter::doRead, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close);
|
||||
}
|
||||
|
||||
return Flux.usingWhen(Mono.just(inputStream), it -> {
|
||||
/**
|
||||
* Use an {@link AsyncInputStreamHandler} to read data from the given {@link AsyncInputStream}.
|
||||
*
|
||||
* @param inputStream the source stream.
|
||||
* @return a {@link Flux} emitting data chunks one by one.
|
||||
* @since 2.2.1
|
||||
*/
|
||||
private static Flux<DataBuffer> doRead(DelegatingAsyncInputStream inputStream) {
|
||||
|
||||
return Flux.<DataBuffer> create((sink) -> {
|
||||
AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler(inputStream, inputStream.dataBufferFactory,
|
||||
inputStream.bufferSize);
|
||||
return Flux.create((sink) -> {
|
||||
|
||||
sink.onDispose(state::close);
|
||||
sink.onCancel(state::close);
|
||||
sink.onDispose(streamHandler::close);
|
||||
sink.onCancel(streamHandler::close);
|
||||
|
||||
sink.onRequest(n -> {
|
||||
state.request(sink, n);
|
||||
});
|
||||
sink.onRequest(n -> {
|
||||
streamHandler.request(sink, n);
|
||||
});
|
||||
}, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close) //
|
||||
.concatMap(Flux::just, 1);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* An {@link AsyncInputStream} also holding a {@link DataBufferFactory} and default {@literal bufferSize} for reading
|
||||
* from it, delegating operations on the {@link AsyncInputStream} to the reference instance. <br />
|
||||
* Used to pass on the {@link AsyncInputStream} and parameters to avoid capturing lambdas.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @since 2.2.1
|
||||
*/
|
||||
private static class DelegatingAsyncInputStream implements AsyncInputStream {
|
||||
|
||||
private final AsyncInputStream inputStream;
|
||||
private final DataBufferFactory dataBufferFactory;
|
||||
private int bufferSize;
|
||||
|
||||
/**
|
||||
* @param inputStream the source input stream.
|
||||
* @param dataBufferFactory
|
||||
* @param bufferSize
|
||||
*/
|
||||
DelegatingAsyncInputStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) {
|
||||
|
||||
this.inputStream = inputStream;
|
||||
this.dataBufferFactory = dataBufferFactory;
|
||||
this.bufferSize = bufferSize;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer)
|
||||
*/
|
||||
@Override
|
||||
public Publisher<Integer> read(ByteBuffer dst) {
|
||||
return inputStream.read(dst);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long)
|
||||
*/
|
||||
@Override
|
||||
public Publisher<Long> skip(long bytesToSkip) {
|
||||
return inputStream.skip(bytesToSkip);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close()
|
||||
*/
|
||||
@Override
|
||||
public Publisher<Success> close() {
|
||||
return inputStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
static class State {
|
||||
static class AsyncInputStreamHandler {
|
||||
|
||||
private static final AtomicLongFieldUpdater<State> DEMAND = AtomicLongFieldUpdater.newUpdater(State.class,
|
||||
"demand");
|
||||
private static final AtomicLongFieldUpdater<AsyncInputStreamHandler> DEMAND = AtomicLongFieldUpdater
|
||||
.newUpdater(AsyncInputStreamHandler.class, "demand");
|
||||
|
||||
private static final AtomicIntegerFieldUpdater<State> STATE = AtomicIntegerFieldUpdater.newUpdater(State.class,
|
||||
"state");
|
||||
private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> STATE = AtomicIntegerFieldUpdater
|
||||
.newUpdater(AsyncInputStreamHandler.class, "state");
|
||||
|
||||
private static final AtomicIntegerFieldUpdater<State> READ = AtomicIntegerFieldUpdater.newUpdater(State.class,
|
||||
"read");
|
||||
private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> DRAIN = AtomicIntegerFieldUpdater
|
||||
.newUpdater(AsyncInputStreamHandler.class, "drain");
|
||||
|
||||
private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> READ = AtomicIntegerFieldUpdater
|
||||
.newUpdater(AsyncInputStreamHandler.class, "read");
|
||||
|
||||
private static final int STATE_OPEN = 0;
|
||||
private static final int STATE_CLOSED = 1;
|
||||
|
||||
private static final int DRAIN_NONE = 0;
|
||||
private static final int DRAIN_COMPLETION = 1;
|
||||
|
||||
private static final int READ_NONE = 0;
|
||||
private static final int READ_IN_PROGRESS = 1;
|
||||
|
||||
final AsyncInputStream inputStream;
|
||||
final DataBufferFactory dataBufferFactory;
|
||||
final int bufferSize;
|
||||
|
||||
// see DEMAND
|
||||
volatile long demand;
|
||||
@@ -99,14 +170,25 @@ class DataBufferPublisherAdapter {
|
||||
// see STATE
|
||||
volatile int state = STATE_OPEN;
|
||||
|
||||
// see DRAIN
|
||||
volatile int drain = DRAIN_NONE;
|
||||
|
||||
// see READ_IN_PROGRESS
|
||||
volatile int read = READ_NONE;
|
||||
|
||||
void request(FluxSink<DataBuffer> sink, long n) {
|
||||
|
||||
Operators.addCap(DEMAND, this, n);
|
||||
drainLoop(sink);
|
||||
}
|
||||
|
||||
if (onShouldRead()) {
|
||||
/**
|
||||
* Loops while we have demand and while no read is in progress.
|
||||
*
|
||||
* @param sink
|
||||
*/
|
||||
void drainLoop(FluxSink<DataBuffer> sink) {
|
||||
while (onShouldRead()) {
|
||||
emitNext(sink);
|
||||
}
|
||||
}
|
||||
@@ -119,22 +201,30 @@ class DataBufferPublisherAdapter {
|
||||
return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS);
|
||||
}
|
||||
|
||||
boolean onReadDone() {
|
||||
return READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE);
|
||||
void onReadDone() {
|
||||
READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE);
|
||||
}
|
||||
|
||||
long getDemand() {
|
||||
return DEMAND.get(this);
|
||||
}
|
||||
|
||||
boolean decrementDemand() {
|
||||
return DEMAND.decrementAndGet(this) > 0;
|
||||
void decrementDemand() {
|
||||
DEMAND.decrementAndGet(this);
|
||||
}
|
||||
|
||||
void close() {
|
||||
STATE.compareAndSet(this, STATE_OPEN, STATE_CLOSED);
|
||||
}
|
||||
|
||||
boolean enterDrainLoop() {
|
||||
return DRAIN.compareAndSet(this, DRAIN_NONE, DRAIN_COMPLETION);
|
||||
}
|
||||
|
||||
void leaveDrainLoop() {
|
||||
DRAIN.set(this, DRAIN_NONE);
|
||||
}
|
||||
|
||||
boolean isClosed() {
|
||||
return STATE.get(this) == STATE_CLOSED;
|
||||
}
|
||||
@@ -143,15 +233,15 @@ class DataBufferPublisherAdapter {
|
||||
* Emit the next {@link DataBuffer}.
|
||||
*
|
||||
* @param sink
|
||||
* @return
|
||||
*/
|
||||
void emitNext(FluxSink<DataBuffer> sink) {
|
||||
|
||||
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer();
|
||||
ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity());
|
||||
private void emitNext(FluxSink<DataBuffer> sink) {
|
||||
|
||||
ByteBuffer transport = ByteBuffer.allocate(bufferSize);
|
||||
BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber(sink, dataBufferFactory, transport);
|
||||
try {
|
||||
Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate));
|
||||
} catch (Exception e) {
|
||||
inputStream.read(transport).subscribe(bufferCoreSubscriber);
|
||||
} catch (Throwable e) {
|
||||
sink.error(e);
|
||||
}
|
||||
}
|
||||
@@ -159,14 +249,15 @@ class DataBufferPublisherAdapter {
|
||||
private class BufferCoreSubscriber implements CoreSubscriber<Integer> {
|
||||
|
||||
private final FluxSink<DataBuffer> sink;
|
||||
private final DataBuffer dataBuffer;
|
||||
private final ByteBuffer intermediate;
|
||||
private final DataBufferFactory factory;
|
||||
private final ByteBuffer transport;
|
||||
private volatile Subscription subscription;
|
||||
|
||||
BufferCoreSubscriber(FluxSink<DataBuffer> sink, DataBuffer dataBuffer, ByteBuffer intermediate) {
|
||||
BufferCoreSubscriber(FluxSink<DataBuffer> sink, DataBufferFactory factory, ByteBuffer transport) {
|
||||
|
||||
this.sink = sink;
|
||||
this.dataBuffer = dataBuffer;
|
||||
this.intermediate = intermediate;
|
||||
this.factory = factory;
|
||||
this.transport = transport;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -176,6 +267,8 @@ class DataBufferPublisherAdapter {
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Subscription s) {
|
||||
|
||||
this.subscription = s;
|
||||
s.request(1);
|
||||
}
|
||||
|
||||
@@ -183,26 +276,34 @@ class DataBufferPublisherAdapter {
|
||||
public void onNext(Integer bytes) {
|
||||
|
||||
if (isClosed()) {
|
||||
|
||||
onReadDone();
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
Operators.onNextDropped(dataBuffer, sink.currentContext());
|
||||
return;
|
||||
}
|
||||
|
||||
intermediate.flip();
|
||||
dataBuffer.write(intermediate);
|
||||
if (bytes > 0) {
|
||||
|
||||
sink.next(dataBuffer);
|
||||
decrementDemand();
|
||||
|
||||
try {
|
||||
if (bytes == -1) {
|
||||
sink.complete();
|
||||
}
|
||||
} finally {
|
||||
onReadDone();
|
||||
DataBuffer buffer = readNextChunk();
|
||||
sink.next(buffer);
|
||||
decrementDemand();
|
||||
}
|
||||
|
||||
if (bytes == -1) {
|
||||
sink.complete();
|
||||
return;
|
||||
}
|
||||
|
||||
subscription.request(1);
|
||||
}
|
||||
|
||||
private DataBuffer readNextChunk() {
|
||||
|
||||
transport.flip();
|
||||
|
||||
DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining());
|
||||
dataBuffer.write(transport);
|
||||
|
||||
transport.clear();
|
||||
|
||||
return dataBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -214,17 +315,25 @@ class DataBufferPublisherAdapter {
|
||||
return;
|
||||
}
|
||||
|
||||
onReadDone();
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
Operators.onNextDropped(dataBuffer, sink.currentContext());
|
||||
close();
|
||||
sink.error(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
|
||||
if (onShouldRead()) {
|
||||
emitNext(sink);
|
||||
onReadDone();
|
||||
|
||||
if (!isClosed()) {
|
||||
|
||||
if (enterDrainLoop()) {
|
||||
try {
|
||||
drainLoop(sink);
|
||||
} finally {
|
||||
leaveDrainLoop();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,7 +184,7 @@ public class GridFsTemplate extends GridFsOperationsSupport implements GridFsOpe
|
||||
public void delete(Query query) {
|
||||
|
||||
for (GridFSFile gridFSFile : find(query)) {
|
||||
getGridFs().delete(((BsonObjectId) gridFSFile.getId()).getValue());
|
||||
getGridFs().delete(gridFSFile.getId());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,7 +215,7 @@ public class GridFsTemplate extends GridFsOperationsSupport implements GridFsOpe
|
||||
|
||||
Assert.notNull(file, "GridFSFile must not be null!");
|
||||
|
||||
return new GridFsResource(file, getGridFs().openDownloadStream(file.getObjectId()));
|
||||
return new GridFsResource(file, getGridFs().openDownloadStream(file.getId()));
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -20,9 +20,9 @@ import reactor.core.publisher.Flux;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.function.IntFunction;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
import org.springframework.core.io.AbstractResource;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
@@ -35,26 +35,28 @@ import com.mongodb.client.gridfs.model.GridFSFile;
|
||||
* Reactive {@link GridFSFile} based {@link Resource} implementation.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
* @since 2.2
|
||||
*/
|
||||
public class ReactiveGridFsResource extends AbstractResource {
|
||||
|
||||
private static final Integer DEFAULT_CHUNK_SIZE = 256 * 1024;
|
||||
|
||||
private final @Nullable GridFSFile file;
|
||||
private final String filename;
|
||||
private final Flux<DataBuffer> content;
|
||||
private final IntFunction<Flux<DataBuffer>> contentFunction;
|
||||
|
||||
/**
|
||||
* Creates a new, absent {@link ReactiveGridFsResource}.
|
||||
*
|
||||
* @param filename filename of the absent resource.
|
||||
* @param content
|
||||
* @since 2.1
|
||||
*/
|
||||
private ReactiveGridFsResource(String filename, Publisher<DataBuffer> content) {
|
||||
|
||||
this.file = null;
|
||||
this.filename = filename;
|
||||
this.content = Flux.from(content);
|
||||
this.contentFunction = any -> Flux.from(content);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -64,10 +66,21 @@ public class ReactiveGridFsResource extends AbstractResource {
|
||||
* @param content
|
||||
*/
|
||||
public ReactiveGridFsResource(GridFSFile file, Publisher<DataBuffer> content) {
|
||||
this(file, (IntFunction<Flux<DataBuffer>>) any -> Flux.from(content));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}.
|
||||
*
|
||||
* @param file must not be {@literal null}.
|
||||
* @param contentFunction
|
||||
* @since 2.2.1
|
||||
*/
|
||||
ReactiveGridFsResource(GridFSFile file, IntFunction<Flux<DataBuffer>> contentFunction) {
|
||||
|
||||
this.file = file;
|
||||
this.filename = file.getFilename();
|
||||
this.content = Flux.from(content);
|
||||
this.contentFunction = contentFunction;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -165,16 +178,32 @@ public class ReactiveGridFsResource extends AbstractResource {
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the download stream.
|
||||
* Retrieve the download stream using the default chunk size of 256 kB.
|
||||
*
|
||||
* @return
|
||||
* @return a {@link Flux} emitting data chunks one by one. Please make sure to
|
||||
* {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all
|
||||
* {@link DataBuffer buffers} when done.
|
||||
*/
|
||||
public Flux<DataBuffer> getDownloadStream() {
|
||||
return getDownloadStream(DEFAULT_CHUNK_SIZE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the download stream.
|
||||
*
|
||||
* @param chunkSize chunk size in bytes to use.
|
||||
* @return a {@link Flux} emitting data chunks one by one. Please make sure to
|
||||
* {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all
|
||||
* {@link DataBuffer buffers} when done.
|
||||
* @since 2.2.1
|
||||
*/
|
||||
public Flux<DataBuffer> getDownloadStream(int chunkSize) {
|
||||
|
||||
if (!exists()) {
|
||||
return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription())));
|
||||
}
|
||||
return this.content;
|
||||
|
||||
return contentFunction.apply(chunkSize);
|
||||
}
|
||||
|
||||
private void verifyExists() throws FileNotFoundException {
|
||||
|
||||
@@ -49,6 +49,7 @@ import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher;
|
||||
* {@link DefaultDataBufferFactory} to create {@link DataBuffer buffers}.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Nick Stolwijk
|
||||
* @since 2.2
|
||||
*/
|
||||
public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements ReactiveGridFsOperations {
|
||||
@@ -222,9 +223,11 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
|
||||
|
||||
return Mono.fromSupplier(() -> {
|
||||
|
||||
GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getObjectId());
|
||||
return new ReactiveGridFsResource(file, chunkSize -> {
|
||||
|
||||
return new ReactiveGridFsResource(file, BinaryStreamAdapters.toPublisher(stream, dataBufferFactory));
|
||||
GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId());
|
||||
return BinaryStreamAdapters.toPublisher(stream, dataBufferFactory, chunkSize);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -36,6 +36,7 @@ import org.springframework.data.repository.query.ParameterAccessor;
|
||||
import org.springframework.data.repository.query.QueryMethodEvaluationContextProvider;
|
||||
import org.springframework.data.repository.query.RepositoryQuery;
|
||||
import org.springframework.data.repository.query.ResultProcessor;
|
||||
import org.springframework.data.util.TypeInformation;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
@@ -117,13 +118,18 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
|
||||
|
||||
private Object execute(MongoParameterAccessor parameterAccessor) {
|
||||
|
||||
ConvertingParameterAccessor convertingParamterAccessor = new ConvertingParameterAccessor(operations.getConverter(),
|
||||
ConvertingParameterAccessor accessor = new ConvertingParameterAccessor(operations.getConverter(),
|
||||
parameterAccessor);
|
||||
|
||||
ResultProcessor processor = method.getResultProcessor().withDynamicProjection(convertingParamterAccessor);
|
||||
TypeInformation<?> returnType = method.getReturnType();
|
||||
ResultProcessor processor = method.getResultProcessor().withDynamicProjection(accessor);
|
||||
Class<?> typeToRead = processor.getReturnedType().getTypeToRead();
|
||||
|
||||
return doExecute(method, processor, convertingParamterAccessor, typeToRead);
|
||||
if (typeToRead == null && returnType.getComponentType() != null) {
|
||||
typeToRead = returnType.getComponentType().getType();
|
||||
}
|
||||
|
||||
return doExecute(method, processor, accessor, typeToRead);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -36,11 +36,13 @@ import org.springframework.data.geo.Shape;
|
||||
import org.springframework.data.mapping.PersistentPropertyPath;
|
||||
import org.springframework.data.mapping.PropertyPath;
|
||||
import org.springframework.data.mapping.context.MappingContext;
|
||||
import org.springframework.data.mongodb.core.geo.GeoJson;
|
||||
import org.springframework.data.mongodb.core.index.GeoSpatialIndexType;
|
||||
import org.springframework.data.mongodb.core.index.GeoSpatialIndexed;
|
||||
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
|
||||
import org.springframework.data.mongodb.core.query.Criteria;
|
||||
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
|
||||
import org.springframework.data.mongodb.core.query.MetricConversion;
|
||||
import org.springframework.data.mongodb.core.query.MongoRegexCreator;
|
||||
import org.springframework.data.mongodb.core.query.MongoRegexCreator.MatchMode;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
@@ -235,8 +237,14 @@ class MongoQueryCreator extends AbstractQueryCreator<Query, Criteria> {
|
||||
criteria.near(pointToUse);
|
||||
}
|
||||
|
||||
criteria.maxDistance(it.getNormalizedValue());
|
||||
minDistance.ifPresent(min -> criteria.minDistance(min.getNormalizedValue()));
|
||||
if (pointToUse instanceof GeoJson) { // using GeoJson distance is in meters.
|
||||
|
||||
criteria.maxDistance(MetricConversion.getDistanceInMeters(it));
|
||||
minDistance.map(MetricConversion::getDistanceInMeters).ifPresent(criteria::minDistance);
|
||||
} else {
|
||||
criteria.maxDistance(it.getNormalizedValue());
|
||||
minDistance.map(Distance::getNormalizedValue).ifPresent(criteria::minDistance);
|
||||
}
|
||||
|
||||
return criteria;
|
||||
|
||||
|
||||
@@ -95,7 +95,14 @@ public class ReactiveStringBasedAggregation extends AbstractReactiveMongoQuery {
|
||||
Flux<?> flux = reactiveMongoOperations.aggregate(aggregation, targetType);
|
||||
|
||||
if (isSimpleReturnType && !isRawReturnType) {
|
||||
flux = flux.map(it -> AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter));
|
||||
flux = flux.handle((it, sink) -> {
|
||||
|
||||
Object result = AggregationUtils.extractSimpleTypeResult((Document) it, typeToRead, mongoConverter);
|
||||
|
||||
if (result != null) {
|
||||
sink.next(result);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (method.isCollectionQuery()) {
|
||||
|
||||
@@ -16,13 +16,19 @@
|
||||
package org.springframework.data.mongodb.util;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import org.bson.BsonValue;
|
||||
import org.bson.Document;
|
||||
import org.bson.conversions.Bson;
|
||||
import org.bson.json.JsonParseException;
|
||||
|
||||
import org.springframework.core.convert.converter.Converter;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
@@ -30,6 +36,7 @@ import org.springframework.util.StringUtils;
|
||||
import com.mongodb.BasicDBObject;
|
||||
import com.mongodb.DBObject;
|
||||
import com.mongodb.DBRef;
|
||||
import com.mongodb.MongoClientSettings;
|
||||
|
||||
/**
|
||||
* @author Christoph Strobl
|
||||
@@ -147,4 +154,80 @@ public class BsonUtils {
|
||||
return orElse.apply(source);
|
||||
}
|
||||
|
||||
/**
|
||||
* Serialize the given {@link Document} as Json applying default codecs if necessary.
|
||||
*
|
||||
* @param source
|
||||
* @return
|
||||
* @since 2.2.1
|
||||
*/
|
||||
@Nullable
|
||||
public static String toJson(@Nullable Document source) {
|
||||
|
||||
if (source == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return source.toJson();
|
||||
} catch (Exception e) {
|
||||
return toJson((Object) source);
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static String toJson(@Nullable Object value) {
|
||||
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return value instanceof Document
|
||||
? ((Document) value).toJson(MongoClientSettings.getDefaultCodecRegistry().get(Document.class))
|
||||
: serializeValue(value);
|
||||
|
||||
} catch (Exception e) {
|
||||
|
||||
if (value instanceof Collection) {
|
||||
return toString((Collection<?>) value);
|
||||
} else if (value instanceof Map) {
|
||||
return toString((Map<?, ?>) value);
|
||||
} else if (ObjectUtils.isArray(value)) {
|
||||
return toString(Arrays.asList(ObjectUtils.toObjectArray(value)));
|
||||
}
|
||||
|
||||
throw e instanceof JsonParseException ? (JsonParseException) e : new JsonParseException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static String serializeValue(@Nullable Object value) {
|
||||
|
||||
if (value == null) {
|
||||
return "null";
|
||||
}
|
||||
|
||||
String documentJson = new Document("toBeEncoded", value).toJson();
|
||||
return documentJson.substring(documentJson.indexOf(':') + 1, documentJson.length() - 1).trim();
|
||||
}
|
||||
|
||||
private static String toString(Map<?, ?> source) {
|
||||
|
||||
return iterableToDelimitedString(source.entrySet(), "{ ", " }",
|
||||
entry -> String.format("\"%s\" : %s", entry.getKey(), toJson(entry.getValue())));
|
||||
}
|
||||
|
||||
private static String toString(Collection<?> source) {
|
||||
return iterableToDelimitedString(source, "[ ", " ]", BsonUtils::toJson);
|
||||
}
|
||||
|
||||
private static <T> String iterableToDelimitedString(Iterable<T> source, String prefix, String suffix,
|
||||
Converter<? super T, String> transformer) {
|
||||
|
||||
StringJoiner joiner = new StringJoiner(", ", prefix, suffix);
|
||||
|
||||
StreamSupport.stream(source.spliterator(), false).map(transformer::convert).forEach(joiner::add);
|
||||
|
||||
return joiner.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,7 +73,8 @@ fun <T : Any> ExecutableFindOperation.DistinctWithProjection.asType(resultType:
|
||||
* Extension for [ExecutableFindOperation.DistinctWithProjection.as] leveraging reified type parameters.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @since 2.1
|
||||
*/
|
||||
inline fun <reified T : Any> ExecutableFindOperation.DistinctWithProjection.asType(): ExecutableFindOperation.DistinctWithProjection =
|
||||
inline fun <reified T : Any> ExecutableFindOperation.DistinctWithProjection.asType(): ExecutableFindOperation.TerminatingDistinct<T> =
|
||||
`as`(T::class.java)
|
||||
|
||||
@@ -76,7 +76,7 @@ fun <T : Any> ReactiveFindOperation.DistinctWithProjection.asType(resultType: KC
|
||||
* @author Christoph Strobl
|
||||
* @since 2.1
|
||||
*/
|
||||
inline fun <reified T : Any> ReactiveFindOperation.DistinctWithProjection.asType(): ReactiveFindOperation.DistinctWithProjection =
|
||||
inline fun <reified T : Any> ReactiveFindOperation.DistinctWithProjection.asType(): ReactiveFindOperation.TerminatingDistinct<T> =
|
||||
`as`(T::class.java)
|
||||
|
||||
/**
|
||||
|
||||
@@ -20,6 +20,7 @@ import static org.assertj.core.api.Assumptions.*;
|
||||
import static org.springframework.data.mongodb.core.index.PartialIndexFilter.*;
|
||||
import static org.springframework.data.mongodb.core.query.Criteria.*;
|
||||
|
||||
import org.bson.BsonDocument;
|
||||
import org.bson.Document;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@@ -39,6 +40,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.model.IndexOptions;
|
||||
|
||||
/**
|
||||
* Integration tests for {@link DefaultIndexOperations}.
|
||||
@@ -153,6 +155,21 @@ public class DefaultIndexOperationsIntegrationTests {
|
||||
.isEqualTo(Document.parse("{ \"a_g_e\" : { \"$gte\" : 10 } }"));
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2388
|
||||
public void shouldReadIndexWithPartialFilterContainingDbRefCorrectly() {
|
||||
|
||||
BsonDocument partialFilter = BsonDocument.parse(
|
||||
"{ \"the-ref\" : { \"$ref\" : \"other-collection\", \"$id\" : { \"$oid\" : \"59ce08baf264b906810fe8c5\"} } }");
|
||||
IndexOptions indexOptions = new IndexOptions();
|
||||
indexOptions.name("partial-with-dbref");
|
||||
indexOptions.partialFilterExpression(partialFilter);
|
||||
|
||||
collection.createIndex(BsonDocument.parse("{ \"key-1\" : 1, \"key-2\": 1}"), indexOptions);
|
||||
|
||||
IndexInfo info = findAndReturnIndexInfo(indexOps.getIndexInfo(), "partial-with-dbref");
|
||||
assertThat(BsonDocument.parse(info.getPartialFilterExpression())).isEqualTo(partialFilter);
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1518
|
||||
public void shouldCreateIndexWithCollationCorrectly() {
|
||||
|
||||
|
||||
@@ -890,6 +890,17 @@ public class QueryMapperUnitTests {
|
||||
assertThat(target).isEqualTo(new org.bson.Document("_id", "id-1"));
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2394
|
||||
public void leavesDistanceUntouchedWhenUsingGeoJson() {
|
||||
|
||||
Query query = query(where("geoJsonPoint").near(new GeoJsonPoint(27.987901, 86.9165379)).maxDistance(1000));
|
||||
|
||||
org.bson.Document document = mapper.getMappedObject(query.getQueryObject(),
|
||||
context.getPersistentEntity(ClassWithGeoTypes.class));
|
||||
assertThat(document).containsEntry("geoJsonPoint.$near.$geometry.type", "Point");
|
||||
assertThat(document).containsEntry("geoJsonPoint.$near.$maxDistance", 1000.0D);
|
||||
}
|
||||
|
||||
@Document
|
||||
public class Foo {
|
||||
@Id private ObjectId id;
|
||||
|
||||
@@ -31,21 +31,18 @@ import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestRule;
|
||||
|
||||
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||
import org.springframework.dao.DataAccessException;
|
||||
import org.springframework.data.annotation.Id;
|
||||
import org.springframework.data.mongodb.MongoDbFactory;
|
||||
import org.springframework.data.mongodb.core.MongoTemplate;
|
||||
import org.springframework.data.mongodb.core.SimpleMongoClientDbFactory;
|
||||
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
|
||||
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
|
||||
import org.springframework.data.mongodb.test.util.MongoTestUtils;
|
||||
import org.springframework.data.mongodb.test.util.ReplicaSet;
|
||||
import org.springframework.test.annotation.IfProfileValue;
|
||||
import org.springframework.util.ErrorHandler;
|
||||
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.model.CreateCollectionOptions;
|
||||
import com.mongodb.client.model.changestream.ChangeStreamDocument;
|
||||
@@ -61,6 +58,8 @@ public class DefaultMessageListenerContainerTests {
|
||||
public static final String COLLECTION_NAME = "collection-1";
|
||||
public static final String COLLECTION_2_NAME = "collection-2";
|
||||
|
||||
public static final Duration TIMEOUT = Duration.ofSeconds(2);
|
||||
|
||||
public @Rule TestRule replSet = ReplicaSet.none();
|
||||
|
||||
MongoDbFactory dbFactory;
|
||||
@@ -94,12 +93,12 @@ public class DefaultMessageListenerContainerTests {
|
||||
Person.class);
|
||||
container.start();
|
||||
|
||||
awaitSubscription(subscription, Duration.ofMillis(500));
|
||||
awaitSubscription(subscription, TIMEOUT);
|
||||
|
||||
collection.insertOne(new Document("_id", "id-1").append("firstname", "foo"));
|
||||
collection.insertOne(new Document("_id", "id-2").append("firstname", "bar"));
|
||||
|
||||
awaitMessages(messageListener, 2, Duration.ofMillis(500));
|
||||
awaitMessages(messageListener, 2, TIMEOUT);
|
||||
|
||||
assertThat(messageListener.getMessages().stream().map(Message::getBody).collect(Collectors.toList()))
|
||||
.containsExactly(new Person("id-1", "foo"), new Person("id-2", "bar"));
|
||||
@@ -125,12 +124,12 @@ public class DefaultMessageListenerContainerTests {
|
||||
}, () -> COLLECTION_NAME), Person.class, errorHandler);
|
||||
container.start();
|
||||
|
||||
awaitSubscription(subscription, Duration.ofMillis(500));
|
||||
awaitSubscription(subscription, TIMEOUT);
|
||||
|
||||
collection.insertOne(new Document("_id", "id-1").append("firstname", "foo"));
|
||||
collection.insertOne(new Document("_id", "id-2").append("firstname", "bar"));
|
||||
|
||||
awaitMessages(messageListener, 2, Duration.ofMillis(500));
|
||||
awaitMessages(messageListener, 2, TIMEOUT);
|
||||
|
||||
verify(errorHandler, atLeast(1)).handleError(any(IllegalStateException.class));
|
||||
assertThat(messageListener.getTotalNumberMessagesReceived()).isEqualTo(2);
|
||||
@@ -145,12 +144,12 @@ public class DefaultMessageListenerContainerTests {
|
||||
Document.class);
|
||||
container.start();
|
||||
|
||||
awaitSubscription(subscription, Duration.ofMillis(500));
|
||||
awaitSubscription(subscription, TIMEOUT);
|
||||
|
||||
collection.insertOne(new Document("_id", "id-1").append("value", "foo"));
|
||||
collection.insertOne(new Document("_id", "id-2").append("value", "bar"));
|
||||
|
||||
awaitMessages(messageListener, 2, Duration.ofMillis(500));
|
||||
awaitMessages(messageListener, 2, TIMEOUT);
|
||||
|
||||
container.stop();
|
||||
|
||||
@@ -174,12 +173,12 @@ public class DefaultMessageListenerContainerTests {
|
||||
Subscription subscription = container.register(new ChangeStreamRequest(messageListener, () -> COLLECTION_NAME),
|
||||
Document.class);
|
||||
|
||||
awaitSubscription(subscription, Duration.ofMillis(500));
|
||||
awaitSubscription(subscription, TIMEOUT);
|
||||
|
||||
Document expected = new Document("_id", "id-2").append("value", "bar");
|
||||
collection.insertOne(expected);
|
||||
|
||||
awaitMessages(messageListener, 1, Duration.ofMillis(500));
|
||||
awaitMessages(messageListener, 1, TIMEOUT);
|
||||
container.stop();
|
||||
|
||||
assertThat(messageListener.getMessages().stream().map(Message::getBody).collect(Collectors.toList()))
|
||||
@@ -226,11 +225,11 @@ public class DefaultMessageListenerContainerTests {
|
||||
|
||||
awaitSubscription(
|
||||
container.register(new TailableCursorRequest(messageListener, () -> COLLECTION_NAME), Document.class),
|
||||
Duration.ofMillis(500));
|
||||
TIMEOUT);
|
||||
|
||||
collection.insertOne(new Document("_id", "id-2").append("value", "bar"));
|
||||
|
||||
awaitMessages(messageListener, 2, Duration.ofSeconds(2));
|
||||
awaitMessages(messageListener, 2, TIMEOUT);
|
||||
container.stop();
|
||||
|
||||
assertThat(messageListener.getTotalNumberMessagesReceived()).isEqualTo(2);
|
||||
@@ -247,12 +246,12 @@ public class DefaultMessageListenerContainerTests {
|
||||
|
||||
awaitSubscription(
|
||||
container.register(new TailableCursorRequest(messageListener, () -> COLLECTION_NAME), Document.class),
|
||||
Duration.ofMillis(500));
|
||||
TIMEOUT);
|
||||
|
||||
collection.insertOne(new Document("_id", "id-1").append("value", "foo"));
|
||||
collection.insertOne(new Document("_id", "id-2").append("value", "bar"));
|
||||
|
||||
awaitMessages(messageListener, 2, Duration.ofSeconds(2));
|
||||
awaitMessages(messageListener, 2, TIMEOUT);
|
||||
container.stop();
|
||||
|
||||
assertThat(messageListener.getTotalNumberMessagesReceived()).isEqualTo(2);
|
||||
@@ -359,7 +358,7 @@ public class DefaultMessageListenerContainerTests {
|
||||
|
||||
container.start();
|
||||
|
||||
awaitSubscription(subscription, Duration.ofMillis(500));
|
||||
awaitSubscription(subscription, TIMEOUT);
|
||||
|
||||
collection.insertOne(new Document("_id", "col-1-id-1").append("firstname", "foo"));
|
||||
collection.insertOne(new Document("_id", "col-1-id-2").append("firstname", "bar"));
|
||||
@@ -367,7 +366,7 @@ public class DefaultMessageListenerContainerTests {
|
||||
collection2.insertOne(new Document("_id", "col-2-id-1").append("firstname", "bar"));
|
||||
collection2.insertOne(new Document("_id", "col-2-id-2").append("firstname", "foo"));
|
||||
|
||||
awaitMessages(messageListener, 4, Duration.ofMillis(500));
|
||||
awaitMessages(messageListener, 4, TIMEOUT);
|
||||
|
||||
assertThat(messageListener.getMessages().stream().map(Message::getBody).collect(Collectors.toList()))
|
||||
.containsExactly(new Person("col-1-id-1", "foo"), new Person("col-1-id-2", "bar"),
|
||||
|
||||
@@ -25,6 +25,7 @@ import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
@@ -49,7 +50,7 @@ public class BinaryStreamAdaptersUnitTests {
|
||||
byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
|
||||
AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
|
||||
|
||||
Flux<DataBuffer> dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory());
|
||||
Flux<DataBuffer> dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory(), 256);
|
||||
|
||||
DataBufferUtils.join(dataBuffers) //
|
||||
.as(StepVerifier::create) //
|
||||
|
||||
@@ -44,7 +44,7 @@ public class DataBufferPublisherAdapterUnitTests {
|
||||
when(asyncInput.read(any())).thenReturn(Mono.just(1), Mono.error(new IllegalStateException()));
|
||||
when(asyncInput.close()).thenReturn(Mono.empty());
|
||||
|
||||
Flux<DataBuffer> binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory);
|
||||
Flux<DataBuffer> binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory, 256);
|
||||
|
||||
StepVerifier.create(binaryStream, 0) //
|
||||
.thenRequest(1) //
|
||||
|
||||
@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.bson.BsonObjectId;
|
||||
import org.bson.Document;
|
||||
@@ -39,6 +40,7 @@ import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.data.domain.Sort;
|
||||
import org.springframework.data.domain.Sort.Direction;
|
||||
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
@@ -47,6 +49,8 @@ import org.springframework.util.StreamUtils;
|
||||
import com.mongodb.MongoGridFSException;
|
||||
import com.mongodb.client.gridfs.GridFSFindIterable;
|
||||
import com.mongodb.client.gridfs.model.GridFSFile;
|
||||
import com.mongodb.gridfs.GridFS;
|
||||
import com.mongodb.gridfs.GridFSInputFile;
|
||||
|
||||
/**
|
||||
* Integration tests for {@link GridFsTemplate}.
|
||||
@@ -65,6 +69,7 @@ public class GridFsTemplateIntegrationTests {
|
||||
Resource resource = new ClassPathResource("gridfs/gridfs.xml");
|
||||
|
||||
@Autowired GridFsOperations operations;
|
||||
@Autowired SimpleMongoDbFactory mongoClient;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
@@ -76,24 +81,42 @@ public class GridFsTemplateIntegrationTests {
|
||||
|
||||
ObjectId reference = operations.store(resource.getInputStream(), "foo.xml");
|
||||
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
|
||||
GridFSFindIterable result = operations.find(query(where("_id").is(reference)));
|
||||
result.into(files);
|
||||
assertThat(files.size()).isEqualTo(1);
|
||||
assertThat(files).hasSize(1);
|
||||
assertThat(((BsonObjectId) files.get(0).getId()).getValue()).isEqualTo(reference);
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2392
|
||||
public void storesAndFindsByUUID() throws IOException {
|
||||
|
||||
UUID uuid = UUID.randomUUID();
|
||||
|
||||
GridFS fs = new GridFS(mongoClient.getLegacyDb());
|
||||
GridFSInputFile in = fs.createFile(resource.getInputStream(), "gridfs.xml");
|
||||
|
||||
in.put("_id", uuid);
|
||||
in.put("contentType", "application/octet-stream");
|
||||
in.save();
|
||||
|
||||
GridFSFile file = operations.findOne(query(where("_id").is(uuid)));
|
||||
GridFsResource resource = operations.getResource(file);
|
||||
|
||||
assertThat(resource.exists()).isTrue();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-6
|
||||
public void writesMetadataCorrectly() throws IOException {
|
||||
|
||||
Document metadata = new Document("key", "value");
|
||||
ObjectId reference = operations.store(resource.getInputStream(), "foo.xml", metadata);
|
||||
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
|
||||
GridFSFindIterable result = operations.find(query(whereMetaData("key").is("value")));
|
||||
result.into(files);
|
||||
|
||||
assertThat(files.size()).isEqualTo(1);
|
||||
assertThat(files).hasSize(1);
|
||||
assertThat(((BsonObjectId) files.get(0).getId()).getValue()).isEqualTo(reference);
|
||||
}
|
||||
|
||||
@@ -105,11 +128,11 @@ public class GridFsTemplateIntegrationTests {
|
||||
|
||||
ObjectId reference = operations.store(resource.getInputStream(), "foo.xml", metadata);
|
||||
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
|
||||
GridFSFindIterable result = operations.find(query(whereFilename().is("foo.xml")));
|
||||
result.into(files);
|
||||
|
||||
assertThat(files.size()).isEqualTo(1);
|
||||
assertThat(files).hasSize(1);
|
||||
assertThat(((BsonObjectId) files.get(0).getId()).getValue()).isEqualTo(reference);
|
||||
}
|
||||
|
||||
@@ -120,7 +143,7 @@ public class GridFsTemplateIntegrationTests {
|
||||
|
||||
GridFsResource[] resources = operations.getResources("*.xml");
|
||||
|
||||
assertThat(resources.length).isEqualTo(1);
|
||||
assertThat(resources).hasSize(1);
|
||||
assertThat(((BsonObjectId) resources[0].getId()).getValue()).isEqualTo(reference);
|
||||
assertThat(resources[0].contentLength()).isEqualTo(resource.contentLength());
|
||||
}
|
||||
@@ -131,7 +154,7 @@ public class GridFsTemplateIntegrationTests {
|
||||
ObjectId reference = operations.store(resource.getInputStream(), "foo.xml");
|
||||
|
||||
GridFsResource[] resources = operations.getResources("foo.xml");
|
||||
assertThat(resources.length).isEqualTo(1);
|
||||
assertThat(resources).hasSize(1);
|
||||
assertThat(((BsonObjectId) resources[0].getId()).getValue()).isEqualTo(reference);
|
||||
assertThat(resources[0].contentLength()).isEqualTo(resource.contentLength());
|
||||
}
|
||||
@@ -141,11 +164,11 @@ public class GridFsTemplateIntegrationTests {
|
||||
|
||||
ObjectId reference = operations.store(resource.getInputStream(), "foo2.xml", "application/xml");
|
||||
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
|
||||
GridFSFindIterable result = operations.find(query(whereContentType().is("application/xml")));
|
||||
result.into(files);
|
||||
|
||||
assertThat(files.size()).isEqualTo(1);
|
||||
assertThat(files).hasSize(1);
|
||||
assertThat(((BsonObjectId) files.get(0).getId()).getValue()).isEqualTo(reference);
|
||||
}
|
||||
|
||||
@@ -158,7 +181,7 @@ public class GridFsTemplateIntegrationTests {
|
||||
|
||||
Query query = new Query().with(Sort.by(Direction.ASC, "filename"));
|
||||
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
|
||||
GridFSFindIterable result = operations.find(query);
|
||||
result.into(files);
|
||||
|
||||
@@ -194,7 +217,7 @@ public class GridFsTemplateIntegrationTests {
|
||||
Document metadata = new Document("key", "value");
|
||||
ObjectId reference = operations.store(resource.getInputStream(), "foobar", metadata);
|
||||
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
|
||||
GridFSFindIterable result = operations.find(query(whereMetaData("key").is("value")));
|
||||
result.into(files);
|
||||
|
||||
@@ -208,7 +231,7 @@ public class GridFsTemplateIntegrationTests {
|
||||
metadata.version = "1.0";
|
||||
ObjectId reference = operations.store(resource.getInputStream(), "foobar", metadata);
|
||||
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
|
||||
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
|
||||
GridFSFindIterable result = operations.find(query(whereMetaData("version").is("1.0")));
|
||||
result.into(files);
|
||||
|
||||
@@ -229,7 +252,7 @@ public class GridFsTemplateIntegrationTests {
|
||||
operations.store(resource.getInputStream(), "no-content-type", (String) null);
|
||||
GridFsResource result = operations.getResource("no-content-type");
|
||||
|
||||
assertThatThrownBy(() -> result.getContentType()).isInstanceOf(MongoGridFSException.class);
|
||||
assertThatThrownBy(result::getContentType).isInstanceOf(MongoGridFSException.class);
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1813
|
||||
|
||||
@@ -24,6 +24,8 @@ import reactor.core.publisher.Flux;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.bson.BsonObjectId;
|
||||
import org.bson.Document;
|
||||
@@ -31,7 +33,6 @@ import org.bson.types.ObjectId;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.core.io.Resource;
|
||||
@@ -39,11 +40,17 @@ import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.DefaultDataBuffer;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.dao.IncorrectResultSizeDataAccessException;
|
||||
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
|
||||
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
|
||||
import org.springframework.data.mongodb.core.convert.MongoConverter;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.util.StreamUtils;
|
||||
|
||||
import com.mongodb.gridfs.GridFS;
|
||||
import com.mongodb.gridfs.GridFSInputFile;
|
||||
import com.mongodb.internal.HexUtils;
|
||||
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
|
||||
import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper;
|
||||
|
||||
@@ -52,6 +59,7 @@ import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper;
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @author Christoph Strobl
|
||||
* @author Nick Stolwijk
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@ContextConfiguration("classpath:gridfs/reactive-gridfs.xml")
|
||||
@@ -60,6 +68,9 @@ public class ReactiveGridFsTemplateTests {
|
||||
Resource resource = new ClassPathResource("gridfs/gridfs.xml");
|
||||
|
||||
@Autowired ReactiveGridFsOperations operations;
|
||||
@Autowired SimpleMongoDbFactory mongoClient;
|
||||
@Autowired ReactiveMongoDatabaseFactory dbFactory;
|
||||
@Autowired MongoConverter mongoConverter;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
@@ -86,6 +97,64 @@ public class ReactiveGridFsTemplateTests {
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1855
|
||||
public void storesAndLoadsLargeFileCorrectly() {
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000); // 1 mb
|
||||
int i = 0;
|
||||
while (buffer.remaining() != 0) {
|
||||
buffer.put(HexUtils.toHex(new byte[] { (byte) (i++ % 16) }).getBytes());
|
||||
}
|
||||
buffer.flip();
|
||||
|
||||
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
|
||||
|
||||
ObjectId reference = operations.store(Flux.just(factory.wrap(buffer)), "large.txt").block();
|
||||
|
||||
buffer.clear();
|
||||
|
||||
// default chunk size
|
||||
operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource)
|
||||
.flatMapMany(ReactiveGridFsResource::getDownloadStream) //
|
||||
.transform(DataBufferUtils::join) //
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(dataBuffer -> {
|
||||
|
||||
assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining());
|
||||
assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer);
|
||||
}).verifyComplete();
|
||||
|
||||
// small chunk size
|
||||
operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource)
|
||||
.flatMapMany(reactiveGridFsResource -> reactiveGridFsResource.getDownloadStream(256)) //
|
||||
.transform(DataBufferUtils::join) //
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(dataBuffer -> {
|
||||
|
||||
assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining());
|
||||
assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer);
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2392
|
||||
public void storesAndFindsByUUID() throws IOException {
|
||||
|
||||
UUID uuid = UUID.randomUUID();
|
||||
|
||||
GridFS fs = new GridFS(mongoClient.getLegacyDb());
|
||||
GridFSInputFile in = fs.createFile(resource.getInputStream(), "gridfs.xml");
|
||||
|
||||
in.put("_id", uuid);
|
||||
in.put("contentType", "application/octet-stream");
|
||||
in.save();
|
||||
|
||||
operations.findOne(query(where("_id").is(uuid))).flatMap(operations::getResource)
|
||||
.flatMapMany(ReactiveGridFsResource::getDownloadStream) //
|
||||
.transform(DataBufferUtils::join) //
|
||||
.doOnNext(DataBufferUtils::release).as(StepVerifier::create) //
|
||||
.expectNextCount(1).verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1855
|
||||
public void writesMetadataCorrectly() throws IOException {
|
||||
|
||||
@@ -148,7 +217,8 @@ public class ReactiveGridFsTemplateTests {
|
||||
public void shouldEmitFirstEntryWhenFindFirstRetrievesMoreThanOneResult() throws IOException {
|
||||
|
||||
AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
|
||||
AsyncInputStream upload2 = AsyncStreamHelper.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream());
|
||||
AsyncInputStream upload2 = AsyncStreamHelper
|
||||
.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream());
|
||||
|
||||
operations.store(upload1, "foo.xml", null, null).block();
|
||||
operations.store(upload2, "foo2.xml", null, null).block();
|
||||
@@ -159,8 +229,7 @@ public class ReactiveGridFsTemplateTests {
|
||||
.assertNext(actual -> {
|
||||
|
||||
assertThat(actual.getGridFSFile()).isNotNull();
|
||||
})
|
||||
.verifyComplete();
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2240
|
||||
@@ -179,7 +248,8 @@ public class ReactiveGridFsTemplateTests {
|
||||
public void shouldEmitErrorWhenFindOneRetrievesMoreThanOneResult() throws IOException {
|
||||
|
||||
AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
|
||||
AsyncInputStream upload2 = AsyncStreamHelper.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream());
|
||||
AsyncInputStream upload2 = AsyncStreamHelper
|
||||
.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream());
|
||||
|
||||
operations.store(upload1, "foo.xml", null, null).block();
|
||||
operations.store(upload2, "foo2.xml", null, null).block();
|
||||
|
||||
@@ -27,6 +27,7 @@ import reactor.core.publisher.Mono;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -36,7 +37,6 @@ import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
@@ -535,6 +535,46 @@ public class ReactiveMongoRepositoryTests {
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2153
|
||||
public void annotatedAggregationWithAggregationResultAsMap() {
|
||||
|
||||
repository.sumAgeAndReturnSumAsMap() //
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(it -> {
|
||||
assertThat(it).isInstanceOf(Map.class);
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2403
|
||||
public void annotatedAggregationExtractingSimpleValueIsEmptyForEmptyDocument() {
|
||||
|
||||
Person p = new Person("project-on-lastanme", null);
|
||||
repository.save(p).then().as(StepVerifier::create).verifyComplete();
|
||||
|
||||
repository.projectToLastnameAndRemoveId(p.getFirstname()) //
|
||||
.as(StepVerifier::create) //
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2403
|
||||
public void annotatedAggregationSkipsEmptyDocumentsWhenExtractingSimpleValue() {
|
||||
|
||||
String firstname = "project-on-lastanme";
|
||||
|
||||
Person p1 = new Person(firstname, null);
|
||||
p1.setEmail("p1@example.com");
|
||||
Person p2 = new Person(firstname, "lastname");
|
||||
p2.setEmail("p2@example.com");
|
||||
Person p3 = new Person(firstname, null);
|
||||
p3.setEmail("p3@example.com");
|
||||
|
||||
repository.saveAll(Arrays.asList(p1, p2, p3)).then().as(StepVerifier::create).verifyComplete();
|
||||
|
||||
repository.projectToLastnameAndRemoveId(firstname) //
|
||||
.as(StepVerifier::create) //
|
||||
.expectNext("lastname").verifyComplete();
|
||||
}
|
||||
|
||||
interface ReactivePersonRepository
|
||||
extends ReactiveMongoRepository<Person, String>, ReactiveQuerydslPredicateExecutor<Person> {
|
||||
|
||||
@@ -596,6 +636,13 @@ public class ReactiveMongoRepositoryTests {
|
||||
@Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }")
|
||||
Mono<SumAge> sumAgeAndReturnSumWrapper();
|
||||
|
||||
@Aggregation(pipeline = "{ '$group' : { '_id' : null, 'total' : { $sum: '$age' } } }")
|
||||
Mono<Map> sumAgeAndReturnSumAsMap();
|
||||
|
||||
@Aggregation(
|
||||
pipeline = { "{ '$match' : { 'firstname' : '?0' } }", "{ '$project' : { '_id' : 0, 'lastname' : 1 } }" })
|
||||
Mono<String> projectToLastnameAndRemoveId(String firstname);
|
||||
|
||||
@Query(value = "{_id:?0}")
|
||||
Mono<org.bson.Document> findDocumentById(String id);
|
||||
}
|
||||
|
||||
@@ -652,6 +652,17 @@ public class MongoQueryCreatorUnitTests {
|
||||
assertThat(creator.createQuery()).isEqualTo(query(where("age").gt(10).lt(11)));
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2394
|
||||
public void nearShouldUseMetricDistanceForGeoJsonTypes() {
|
||||
|
||||
GeoJsonPoint point = new GeoJsonPoint(27.987901, 86.9165379);
|
||||
PartTree tree = new PartTree("findByLocationNear", User.class);
|
||||
MongoQueryCreator creator = new MongoQueryCreator(tree,
|
||||
getAccessor(converter, point, new Distance(1, Metrics.KILOMETERS)), context);
|
||||
|
||||
assertThat(creator.createQuery()).isEqualTo(query(where("location").nearSphere(point).maxDistance(1000.0D)));
|
||||
}
|
||||
|
||||
interface PersonRepository extends Repository<Person, Long> {
|
||||
|
||||
List<Person> findByLocationNearAndFirstname(Point location, Distance maxDistance, String firstname);
|
||||
|
||||
@@ -3,13 +3,13 @@
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util https://www.springframework.org/schema/util/spring-util.xsd">
|
||||
|
||||
<bean id="mongoClient" class="org.springframework.data.mongodb.core.ReactiveMongoClientFactoryBean">
|
||||
<bean id="reactiveMongoClient" class="org.springframework.data.mongodb.core.ReactiveMongoClientFactoryBean">
|
||||
<property name="host" value="127.0.0.1"/>
|
||||
<property name="port" value="27017"/>
|
||||
</bean>
|
||||
|
||||
<bean id="reactiveMongoDbFactory" class="org.springframework.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
|
||||
<constructor-arg name="mongoClient" ref="mongoClient"/>
|
||||
<constructor-arg name="mongoClient" ref="reactiveMongoClient" type="com.mongodb.reactivestreams.client.MongoClient"/>
|
||||
<constructor-arg name="databaseName" value="reactive_gridfs"/>
|
||||
</bean>
|
||||
|
||||
@@ -27,4 +27,13 @@
|
||||
<constructor-arg ref="converter"/>
|
||||
</bean>
|
||||
|
||||
<bean id="mongoClient" class="org.springframework.data.mongodb.core.MongoClientFactoryBean">
|
||||
<property name="host" value="127.0.0.1"/>
|
||||
<property name="port" value="27017"/>
|
||||
</bean>
|
||||
|
||||
<bean id="mongoDbFactory" class="org.springframework.data.mongodb.core.SimpleMongoDbFactory">
|
||||
<constructor-arg name="mongoClient" ref="mongoClient"/>
|
||||
<constructor-arg name="databaseName" value="reactive_gridfs"/>
|
||||
</bean>
|
||||
</beans>
|
||||
|
||||
@@ -1,6 +1,36 @@
|
||||
Spring Data MongoDB Changelog
|
||||
=============================
|
||||
|
||||
Changes in version 2.2.2.RELEASE (2019-11-18)
|
||||
---------------------------------------------
|
||||
* DATAMONGO-2414 - ReactiveGridFsResource.getDownloadStream(…) hang if completion happens on event loop.
|
||||
* DATAMONGO-2409 - Extension Function ReactiveFindOperation.DistinctWithProjection.asType() has wrong return type.
|
||||
* DATAMONGO-2403 - ReactiveStringBasedAggregation / AggregationUtils fails on NPE because source or value is null.
|
||||
* DATAMONGO-2402 - Release 2.2.2 (Moore SR2).
|
||||
|
||||
|
||||
Changes in version 2.1.13.RELEASE (2019-11-18)
|
||||
----------------------------------------------
|
||||
* DATAMONGO-2409 - Extension Function ReactiveFindOperation.DistinctWithProjection.asType() has wrong return type.
|
||||
* DATAMONGO-2401 - Release 2.1.13 (Lovelace SR13).
|
||||
|
||||
|
||||
Changes in version 2.2.1.RELEASE (2019-11-04)
|
||||
---------------------------------------------
|
||||
* DATAMONGO-2399 - Upgrade to mongo-java-driver 3.11.1.
|
||||
* DATAMONGO-2394 - nearSphere query wrongly generated with radian parameter instead of meters.
|
||||
* DATAMONGO-2393 - Reading large file from ReactiveGridFsTemplate causes a stackoverflow and the code to hang.
|
||||
* DATAMONGO-2392 - Reading GridFS files written with old api and custom id fails on ReactiveGridFsTemplate.
|
||||
* DATAMONGO-2388 - IndexOperations.getIndexInfo() fails for index that has partialFilterExpression containing DBRef.
|
||||
* DATAMONGO-2382 - Release 2.2.1 (Moore SR1).
|
||||
|
||||
|
||||
Changes in version 2.1.12.RELEASE (2019-11-04)
|
||||
----------------------------------------------
|
||||
* DATAMONGO-2388 - IndexOperations.getIndexInfo() fails for index that has partialFilterExpression containing DBRef.
|
||||
* DATAMONGO-2381 - Release 2.1.12 (Lovelace SR12).
|
||||
|
||||
|
||||
Changes in version 2.2.0.RELEASE (2019-09-30)
|
||||
---------------------------------------------
|
||||
* DATAMONGO-2380 - Remove @ExperimentalCoroutinesApi annotations.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
Spring Data MongoDB 2.2 GA
|
||||
Spring Data MongoDB 2.2.2
|
||||
Copyright (c) [2010-2019] Pivotal Software, Inc.
|
||||
|
||||
This product is licensed to you under the Apache License, Version 2.0 (the "License").
|
||||
|
||||
Reference in New Issue
Block a user