Compare commits

...

20 Commits

Author SHA1 Message Date
Christoph Strobl
2cfcdaff7c DATAMONGO-2382 - Release version 2.2.1 (Moore SR1). 2019-11-04 14:55:02 +01:00
Christoph Strobl
9d9cf46e47 DATAMONGO-2382 - Prepare 2.2.1 (Moore SR1). 2019-11-04 14:54:16 +01:00
Christoph Strobl
98661cf9a2 DATAMONGO-2382 - Updated changelog. 2019-11-04 14:54:10 +01:00
Christoph Strobl
cc50cd5e3a DATAMONGO-2381 - Updated changelog. 2019-11-04 10:34:52 +01:00
Christoph Strobl
d8399d2d23 DATAMONGO-2393 - Remove capturing lambdas and extract methods.
Original Pull Request: #799
2019-10-31 12:57:06 +01:00
Mark Paluch
f2134fb2f8 DATAMONGO-2393 - Support configurable chunk size.
We now allow consuming GridFS files using a configurable chunk size. The default chunk size is now 256kb.

Original Pull Request: #799
2019-10-31 12:56:59 +01:00
Mark Paluch
ec3ccc004e DATAMONGO-2393 - Polishing.
Extract read requests into inner class.

Original Pull Request: #799
2019-10-31 12:56:50 +01:00
Mark Paluch
6cb246c18a DATAMONGO-2393 - Fix BufferOverflow in GridFS upload.
AsyncInputStreamAdapter now properly splits and buffers incoming DataBuffers according the read requests of AsyncInputStream.read(…) calls.
Previously, the adapter used the input buffer size to be used as the output buffer size. A larger DataBuffer than the transfer buffer handed in through read(…) caused a BufferOverflow.

Original Pull Request: #799
2019-10-31 12:56:42 +01:00
Mark Paluch
e73cea0ecf DATAMONGO-2393 - Use drain loop for same-thread processing in GridFS download stream.
We now rely on an outer drain-loop when GridFS reads complete on the same thread instead of using recursive subscriptions to avoid StackOverflow. Previously, we recursively invoked subscriptions that lead to an increased stack size.

Original Pull Request: #799
2019-10-31 12:56:28 +01:00
Christoph Strobl
c69e185a2a DATAMONGO-2399 - Upgrade to MongoDB Java Driver 3.11.1 2019-10-30 10:49:43 +01:00
Mark Paluch
5789f59222 DATAMONGO-2388 - Polishing.
Use StringJoiner to create comma-delimited String. Add nullability annotations.

Original pull request: #797.
2019-10-28 10:55:58 +01:00
Christoph Strobl
5178eeb340 DATAMONGO-2388 - Fix CodecConfigurationException when reading index info that contains DbRef.
Provide the default CodecRegistry when converting partial index data to its String representation used in IndexInfo.

Original pull request: #797.
2019-10-28 10:47:57 +01:00
Mark Paluch
bc5e7fa4a2 DATAMONGO-2394 - Polishing.
Reformat code.

Original pull request: #798.
2019-10-28 09:32:14 +01:00
Christoph Strobl
c28ace6d40 DATAMONGO-2394 - Fix distance conversion for derived finder using near along with GeoJSON.
GeoJson requries the distance to be in meters instead of radians, so we now make sure to convert it correctly

Original pull request: #798.
2019-10-28 09:32:10 +01:00
Mark Paluch
de4fae37e1 DATAMONGO-2392 - Polishing.
Add author tags. Move integration tests to existing test class.
Apply more appropriate in existing tests assertions. Use diamond syntax.

Original pull request: #796.
2019-10-16 13:55:12 +02:00
Mark Paluch
2f1aff3ec3 DATAMONGO-2392 - Consistently use GridFS file Id instead of ObjectId.
We now consistently use GridFSFile.getId() to allow custom Id usage instead of enforcing the Id to be an ObjectId. Using the native Id allows interaction with files that use a custom Id type.

Original pull request: #796.
2019-10-16 13:55:12 +02:00
Nick Stolwijk
6970f934bd DATAMONGO-2392 - Fix handling in ReactiveGridFsTemplate of GridFS files with custom id type.
Original pull request: #796.
2019-10-16 13:55:12 +02:00
Greg Turnquist
6b5168e102 DATAMONGO-2334 - Create CI job. 2019-09-30 14:36:06 -05:00
Mark Paluch
4420edb4dc DATAMONGO-2334 - After release cleanups. 2019-09-30 16:17:54 +02:00
Mark Paluch
c2fae95fee DATAMONGO-2334 - Prepare next development iteration. 2019-09-30 16:17:53 +02:00
25 changed files with 644 additions and 159 deletions

10
Jenkinsfile vendored
View File

@@ -3,7 +3,7 @@ pipeline {
triggers {
pollSCM 'H/10 * * * *'
upstream(upstreamProjects: "spring-data-commons/master", threshold: hudson.model.Result.SUCCESS)
upstream(upstreamProjects: "spring-data-commons/2.2.x", threshold: hudson.model.Result.SUCCESS)
}
options {
@@ -68,7 +68,7 @@ pipeline {
stage("test: baseline") {
when {
anyOf {
branch 'master'
branch '2.2.x'
not { triggeredBy 'UpstreamCause' }
}
}
@@ -94,7 +94,7 @@ pipeline {
stage("Test other configurations") {
when {
anyOf {
branch 'master'
branch '2.2.x'
not { triggeredBy 'UpstreamCause' }
}
}
@@ -143,7 +143,7 @@ pipeline {
stage('Release to artifactory') {
when {
anyOf {
branch 'master'
branch '2.2.x'
not { triggeredBy 'UpstreamCause' }
}
}
@@ -175,7 +175,7 @@ pipeline {
stage('Publish documentation') {
when {
branch 'master'
branch '2.2.x'
}
agent {
docker {

View File

@@ -5,7 +5,7 @@
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>2.2.1.RELEASE</version>
<packaging>pom</packaging>
<name>Spring Data MongoDB</name>
@@ -15,7 +15,7 @@
<parent>
<groupId>org.springframework.data.build</groupId>
<artifactId>spring-data-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>2.2.1.RELEASE</version>
</parent>
<modules>
@@ -26,8 +26,8 @@
<properties>
<project.type>multi</project.type>
<dist.id>spring-data-mongodb</dist.id>
<springdata.commons>2.2.0.RELEASE</springdata.commons>
<mongo>3.11.0</mongo>
<springdata.commons>2.2.1.RELEASE</springdata.commons>
<mongo>3.11.1</mongo>
<mongo.reactivestreams>1.12.0</mongo.reactivestreams>
<jmh.version>1.19</jmh.version>
</properties>

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>2.2.1.RELEASE</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -14,7 +14,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>2.2.1.RELEASE</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>2.2.1.RELEASE</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Optional;
import org.bson.Document;
import org.springframework.data.mongodb.util.BsonUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.NumberUtils;
@@ -117,9 +118,8 @@ public class IndexInfo {
boolean sparse = sourceDocument.containsKey("sparse") ? (Boolean) sourceDocument.get("sparse") : false;
String language = sourceDocument.containsKey("default_language") ? (String) sourceDocument.get("default_language")
: "";
String partialFilter = sourceDocument.containsKey("partialFilterExpression")
? ((Document) sourceDocument.get("partialFilterExpression")).toJson()
: null;
String partialFilter = extractPartialFilterString(sourceDocument);
IndexInfo info = new IndexInfo(indexFields, name, unique, sparse, language);
info.partialFilterExpression = partialFilter;
@@ -134,6 +134,21 @@ public class IndexInfo {
return info;
}
/**
* @param sourceDocument
* @return the {@link String} representation of the partial filter {@link Document}.
* @since 2.1.11
*/
@Nullable
private static String extractPartialFilterString(Document sourceDocument) {
if (!sourceDocument.containsKey("partialFilterExpression")) {
return null;
}
return BsonUtils.toJson(sourceDocument.get("partialFilterExpression", Document.class));
}
/**
* Returns the individual index fields of the index.
*

View File

@@ -29,7 +29,7 @@ import org.springframework.data.geo.Metrics;
* @author Mark Paluch
* @since 2.2
*/
class MetricConversion {
public class MetricConversion {
private static final BigDecimal METERS_MULTIPLIER = new BigDecimal(Metrics.KILOMETERS.getMultiplier())
.multiply(new BigDecimal(1000));
@@ -43,7 +43,7 @@ class MetricConversion {
* @param metric
* @return
*/
protected static double getMetersToMetricMultiplier(Metric metric) {
public static double getMetersToMetricMultiplier(Metric metric) {
ConversionMultiplier conversionMultiplier = ConversionMultiplier.builder().from(METERS_MULTIPLIER).to(metric)
.build();
@@ -56,7 +56,7 @@ class MetricConversion {
* @param distance
* @return
*/
protected static double getDistanceInMeters(Distance distance) {
public static double getDistanceInMeters(Distance distance) {
return new BigDecimal(distance.getValue()).multiply(getMetricToMetersMultiplier(distance.getMetric()))
.doubleValue();
}

View File

@@ -17,6 +17,8 @@ package org.springframework.data.mongodb.gridfs;
import lombok.RequiredArgsConstructor;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;
@@ -26,13 +28,12 @@ import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BiConsumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import com.mongodb.reactivestreams.client.Success;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
@@ -66,14 +67,14 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
private final Publisher<? extends DataBuffer> buffers;
private final Context subscriberContext;
private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
private volatile Subscription subscription;
private volatile boolean cancelled;
private volatile boolean complete;
private volatile boolean allDataBuffersReceived;
private volatile Throwable error;
private final Queue<BiConsumer<DataBuffer, Integer>> readRequests = Queues.<BiConsumer<DataBuffer, Integer>> small()
.get();
private final Queue<ReadRequest> readRequests = Queues.<ReadRequest> small().get();
private final Queue<DataBuffer> bufferQueue = Queues.<DataBuffer> small().get();
// see DEMAND
volatile long demand;
@@ -88,41 +89,30 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
@Override
public Publisher<Integer> read(ByteBuffer dst) {
return Mono.create(sink -> {
return Flux.create(sink -> {
readRequests.offer((db, bytecount) -> {
readRequests.offer(new ReadRequest(sink, dst));
try {
if (error != null) {
sink.error(error);
return;
}
if (bytecount == -1) {
sink.success(-1);
return;
}
ByteBuffer byteBuffer = db.asByteBuffer();
int toWrite = byteBuffer.remaining();
dst.put(byteBuffer);
sink.success(toWrite);
} catch (Exception e) {
sink.error(e);
} finally {
DataBufferUtils.release(db);
}
});
request(1);
sink.onCancel(this::terminatePendingReads);
sink.onDispose(this::terminatePendingReads);
sink.onRequest(this::request);
});
}
void onError(FluxSink<Integer> sink, Throwable e) {
readRequests.poll();
sink.error(e);
}
void onComplete(FluxSink<Integer> sink, int writtenBytes) {
readRequests.poll();
DEMAND.decrementAndGet(this);
sink.next(writtenBytes);
sink.complete();
}
/*
* (non-Javadoc)
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long)
@@ -144,17 +134,19 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
cancelled = true;
if (error != null) {
terminatePendingReads();
sink.error(error);
return;
}
terminatePendingReads();
sink.success(Success.SUCCESS);
});
}
protected void request(int n) {
protected void request(long n) {
if (complete) {
if (allDataBuffersReceived && bufferQueue.isEmpty()) {
terminatePendingReads();
return;
@@ -176,18 +168,51 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
requestFromSubscription(subscription);
}
}
}
void requestFromSubscription(Subscription subscription) {
long demand = DEMAND.get(AsyncInputStreamAdapter.this);
if (cancelled) {
subscription.cancel();
}
if (demand > 0 && DEMAND.compareAndSet(AsyncInputStreamAdapter.this, demand, demand - 1)) {
subscription.request(1);
drainLoop();
}
void drainLoop() {
while (DEMAND.get(AsyncInputStreamAdapter.this) > 0) {
DataBuffer wip = bufferQueue.peek();
if (wip == null) {
break;
}
if (wip.readableByteCount() == 0) {
bufferQueue.poll();
continue;
}
ReadRequest consumer = AsyncInputStreamAdapter.this.readRequests.peek();
if (consumer == null) {
break;
}
consumer.transferBytes(wip, wip.readableByteCount());
}
if (bufferQueue.isEmpty()) {
if (allDataBuffersReceived) {
terminatePendingReads();
return;
}
if (demand > 0) {
subscription.request(1);
}
}
}
@@ -196,10 +221,10 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
*/
void terminatePendingReads() {
BiConsumer<DataBuffer, Integer> readers;
ReadRequest readers;
while ((readers = readRequests.poll()) != null) {
readers.accept(factory.wrap(new byte[0]), -1);
readers.onComplete();
}
}
@@ -214,23 +239,21 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
public void onSubscribe(Subscription s) {
AsyncInputStreamAdapter.this.subscription = s;
Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1);
s.request(1);
}
@Override
public void onNext(DataBuffer dataBuffer) {
if (cancelled || complete) {
if (cancelled || allDataBuffersReceived) {
DataBufferUtils.release(dataBuffer);
Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
return;
}
BiConsumer<DataBuffer, Integer> poll = AsyncInputStreamAdapter.this.readRequests.poll();
ReadRequest readRequest = AsyncInputStreamAdapter.this.readRequests.peek();
if (poll == null) {
if (readRequest == null) {
DataBufferUtils.release(dataBuffer);
Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
@@ -238,29 +261,103 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
return;
}
poll.accept(dataBuffer, dataBuffer.readableByteCount());
bufferQueue.offer(dataBuffer);
requestFromSubscription(subscription);
drainLoop();
}
@Override
public void onError(Throwable t) {
if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.complete) {
if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.allDataBuffersReceived) {
Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext);
return;
}
AsyncInputStreamAdapter.this.error = t;
AsyncInputStreamAdapter.this.complete = true;
AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
terminatePendingReads();
}
@Override
public void onComplete() {
AsyncInputStreamAdapter.this.complete = true;
terminatePendingReads();
AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
if (bufferQueue.isEmpty()) {
terminatePendingReads();
}
}
}
/**
* Request to read bytes and transfer these to the associated {@link ByteBuffer}.
*/
class ReadRequest {
private final FluxSink<Integer> sink;
private final ByteBuffer dst;
private int writtenBytes;
ReadRequest(FluxSink<Integer> sink, ByteBuffer dst) {
this.sink = sink;
this.dst = dst;
this.writtenBytes = -1;
}
public void onComplete() {
if (error != null) {
AsyncInputStreamAdapter.this.onError(sink, error);
return;
}
AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes);
}
public void transferBytes(DataBuffer db, int bytes) {
try {
if (error != null) {
AsyncInputStreamAdapter.this.onError(sink, error);
return;
}
ByteBuffer byteBuffer = db.asByteBuffer();
int remaining = byteBuffer.remaining();
int writeCapacity = Math.min(dst.remaining(), remaining);
int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity());
int toWrite = limit - byteBuffer.position();
if (toWrite == 0) {
AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes);
return;
}
int oldPosition = byteBuffer.position();
byteBuffer.limit(toWrite);
dst.put(byteBuffer);
byteBuffer.limit(byteBuffer.capacity());
byteBuffer.position(oldPosition);
db.readPosition(db.readPosition() + toWrite);
if (writtenBytes == -1) {
writtenBytes = bytes;
} else {
writtenBytes += bytes;
}
} catch (Exception e) {
AsyncInputStreamAdapter.this.onError(sink, e);
} finally {
if (db.readableByteCount() == 0) {
DataBufferUtils.release(db);
}
}
}
}
}

View File

@@ -42,12 +42,14 @@ class BinaryStreamAdapters {
*
* @param inputStream must not be {@literal null}.
* @param dataBufferFactory must not be {@literal null}.
* @param bufferSize read {@code n} bytes per iteration.
* @return {@link Flux} emitting {@link DataBuffer}s.
* @see DataBufferFactory#allocateBuffer()
*/
static Flux<DataBuffer> toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
static Flux<DataBuffer> toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory,
int bufferSize) {
return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) //
return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory, bufferSize) //
.filter(it -> {
if (it.readableByteCount() == 0) {

View File

@@ -29,11 +29,10 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import com.mongodb.reactivestreams.client.Success;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
/**
@@ -51,38 +50,104 @@ class DataBufferPublisherAdapter {
*
* @param inputStream must not be {@literal null}.
* @param dataBufferFactory must not be {@literal null}.
* @param bufferSize read {@code n} bytes per iteration.
* @return the resulting {@link Publisher}.
*/
static Flux<DataBuffer> createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
static Flux<DataBuffer> createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory,
int bufferSize) {
State state = new State(inputStream, dataBufferFactory);
return Flux.usingWhen(Mono.just(new DelegatingAsyncInputStream(inputStream, dataBufferFactory, bufferSize)),
DataBufferPublisherAdapter::doRead, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close);
}
return Flux.usingWhen(Mono.just(inputStream), it -> {
/**
* Use an {@link AsyncInputStreamHandler} to read data from the given {@link AsyncInputStream}.
*
* @param inputStream the source stream.
* @return a {@link Flux} emitting data chunks one by one.
* @since 2.2.1
*/
private static Flux<DataBuffer> doRead(DelegatingAsyncInputStream inputStream) {
return Flux.<DataBuffer> create((sink) -> {
AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler(inputStream, inputStream.dataBufferFactory,
inputStream.bufferSize);
sink.onDispose(state::close);
sink.onCancel(state::close);
return Flux.create((sink) -> {
sink.onRequest(n -> {
state.request(sink, n);
});
sink.onDispose(streamHandler::close);
sink.onCancel(streamHandler::close);
sink.onRequest(n -> {
streamHandler.request(sink, n);
});
}, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close) //
.concatMap(Flux::just, 1);
});
}
/**
* An {@link AsyncInputStream} also holding a {@link DataBufferFactory} and default {@literal bufferSize} for reading
* from it, delegating operations on the {@link AsyncInputStream} to the reference instance. <br />
* Used to pass on the {@link AsyncInputStream} and parameters to avoid capturing lambdas.
*
* @author Christoph Strobl
* @since 2.2.1
*/
private static class DelegatingAsyncInputStream implements AsyncInputStream {
private final AsyncInputStream inputStream;
private final DataBufferFactory dataBufferFactory;
private int bufferSize;
/**
* @param inputStream the source input stream.
* @param dataBufferFactory
* @param bufferSize
*/
DelegatingAsyncInputStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) {
this.inputStream = inputStream;
this.dataBufferFactory = dataBufferFactory;
this.bufferSize = bufferSize;
}
/*
* (non-Javadoc)
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer)
*/
@Override
public Publisher<Integer> read(ByteBuffer dst) {
return inputStream.read(dst);
}
/*
* (non-Javadoc)
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long)
*/
@Override
public Publisher<Long> skip(long bytesToSkip) {
return inputStream.skip(bytesToSkip);
}
/*
* (non-Javadoc)
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close()
*/
@Override
public Publisher<Success> close() {
return inputStream.close();
}
}
@RequiredArgsConstructor
static class State {
static class AsyncInputStreamHandler {
private static final AtomicLongFieldUpdater<State> DEMAND = AtomicLongFieldUpdater.newUpdater(State.class,
"demand");
private static final AtomicLongFieldUpdater<AsyncInputStreamHandler> DEMAND = AtomicLongFieldUpdater
.newUpdater(AsyncInputStreamHandler.class, "demand");
private static final AtomicIntegerFieldUpdater<State> STATE = AtomicIntegerFieldUpdater.newUpdater(State.class,
"state");
private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> STATE = AtomicIntegerFieldUpdater
.newUpdater(AsyncInputStreamHandler.class, "state");
private static final AtomicIntegerFieldUpdater<State> READ = AtomicIntegerFieldUpdater.newUpdater(State.class,
"read");
private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> READ = AtomicIntegerFieldUpdater
.newUpdater(AsyncInputStreamHandler.class, "read");
private static final int STATE_OPEN = 0;
private static final int STATE_CLOSED = 1;
@@ -92,6 +157,7 @@ class DataBufferPublisherAdapter {
final AsyncInputStream inputStream;
final DataBufferFactory dataBufferFactory;
final int bufferSize;
// see DEMAND
volatile long demand;
@@ -105,8 +171,16 @@ class DataBufferPublisherAdapter {
void request(FluxSink<DataBuffer> sink, long n) {
Operators.addCap(DEMAND, this, n);
drainLoop(sink);
}
if (onShouldRead()) {
/**
* Loops while we have demand and while no read is in progress.
*
* @param sink
*/
void drainLoop(FluxSink<DataBuffer> sink) {
while (onShouldRead()) {
emitNext(sink);
}
}
@@ -119,16 +193,16 @@ class DataBufferPublisherAdapter {
return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS);
}
boolean onReadDone() {
return READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE);
void onReadDone() {
READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE);
}
long getDemand() {
return DEMAND.get(this);
}
boolean decrementDemand() {
return DEMAND.decrementAndGet(this) > 0;
void decrementDemand() {
DEMAND.decrementAndGet(this);
}
void close() {
@@ -143,15 +217,15 @@ class DataBufferPublisherAdapter {
* Emit the next {@link DataBuffer}.
*
* @param sink
* @return
*/
void emitNext(FluxSink<DataBuffer> sink) {
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer();
ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity());
private void emitNext(FluxSink<DataBuffer> sink) {
ByteBuffer transport = ByteBuffer.allocate(bufferSize);
BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber(sink, dataBufferFactory, transport);
try {
Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate));
} catch (Exception e) {
inputStream.read(transport).subscribe(bufferCoreSubscriber);
} catch (Throwable e) {
sink.error(e);
}
}
@@ -159,14 +233,16 @@ class DataBufferPublisherAdapter {
private class BufferCoreSubscriber implements CoreSubscriber<Integer> {
private final FluxSink<DataBuffer> sink;
private final DataBuffer dataBuffer;
private final ByteBuffer intermediate;
private final DataBufferFactory factory;
private final ByteBuffer transport;
private final Thread subscribeThread = Thread.currentThread();
private volatile Subscription subscription;
BufferCoreSubscriber(FluxSink<DataBuffer> sink, DataBuffer dataBuffer, ByteBuffer intermediate) {
BufferCoreSubscriber(FluxSink<DataBuffer> sink, DataBufferFactory factory, ByteBuffer transport) {
this.sink = sink;
this.dataBuffer = dataBuffer;
this.intermediate = intermediate;
this.factory = factory;
this.transport = transport;
}
@Override
@@ -176,6 +252,8 @@ class DataBufferPublisherAdapter {
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1);
}
@@ -185,24 +263,38 @@ class DataBufferPublisherAdapter {
if (isClosed()) {
onReadDone();
DataBufferUtils.release(dataBuffer);
Operators.onNextDropped(dataBuffer, sink.currentContext());
return;
}
intermediate.flip();
dataBuffer.write(intermediate);
if (bytes > 0) {
sink.next(dataBuffer);
decrementDemand();
DataBuffer buffer = readNextChunk();
sink.next(buffer);
decrementDemand();
}
try {
if (bytes == -1) {
sink.complete();
return;
}
} finally {
onReadDone();
}
subscription.request(1);
}
private DataBuffer readNextChunk() {
transport.flip();
DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining());
dataBuffer.write(transport);
transport.clear();
return dataBuffer;
}
@Override
@@ -215,16 +307,14 @@ class DataBufferPublisherAdapter {
}
onReadDone();
DataBufferUtils.release(dataBuffer);
Operators.onNextDropped(dataBuffer, sink.currentContext());
sink.error(t);
}
@Override
public void onComplete() {
if (onShouldRead()) {
emitNext(sink);
if (subscribeThread != Thread.currentThread()) {
drainLoop(sink);
}
}
}

View File

@@ -184,7 +184,7 @@ public class GridFsTemplate extends GridFsOperationsSupport implements GridFsOpe
public void delete(Query query) {
for (GridFSFile gridFSFile : find(query)) {
getGridFs().delete(((BsonObjectId) gridFSFile.getId()).getValue());
getGridFs().delete(gridFSFile.getId());
}
}
@@ -215,7 +215,7 @@ public class GridFsTemplate extends GridFsOperationsSupport implements GridFsOpe
Assert.notNull(file, "GridFSFile must not be null!");
return new GridFsResource(file, getGridFs().openDownloadStream(file.getObjectId()));
return new GridFsResource(file, getGridFs().openDownloadStream(file.getId()));
}
/*

View File

@@ -20,9 +20,9 @@ import reactor.core.publisher.Flux;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.IntFunction;
import org.reactivestreams.Publisher;
import org.springframework.core.io.AbstractResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
@@ -35,26 +35,28 @@ import com.mongodb.client.gridfs.model.GridFSFile;
* Reactive {@link GridFSFile} based {@link Resource} implementation.
*
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.2
*/
public class ReactiveGridFsResource extends AbstractResource {
private static final Integer DEFAULT_CHUNK_SIZE = 256 * 1024;
private final @Nullable GridFSFile file;
private final String filename;
private final Flux<DataBuffer> content;
private final IntFunction<Flux<DataBuffer>> contentFunction;
/**
* Creates a new, absent {@link ReactiveGridFsResource}.
*
* @param filename filename of the absent resource.
* @param content
* @since 2.1
*/
private ReactiveGridFsResource(String filename, Publisher<DataBuffer> content) {
this.file = null;
this.filename = filename;
this.content = Flux.from(content);
this.contentFunction = any -> Flux.from(content);
}
/**
@@ -64,10 +66,21 @@ public class ReactiveGridFsResource extends AbstractResource {
* @param content
*/
public ReactiveGridFsResource(GridFSFile file, Publisher<DataBuffer> content) {
this(file, (IntFunction<Flux<DataBuffer>>) any -> Flux.from(content));
}
/**
* Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}.
*
* @param file must not be {@literal null}.
* @param contentFunction
* @since 2.2.1
*/
ReactiveGridFsResource(GridFSFile file, IntFunction<Flux<DataBuffer>> contentFunction) {
this.file = file;
this.filename = file.getFilename();
this.content = Flux.from(content);
this.contentFunction = contentFunction;
}
/**
@@ -165,16 +178,32 @@ public class ReactiveGridFsResource extends AbstractResource {
}
/**
* Retrieve the download stream.
* Retrieve the download stream using the default chunk size of 256 kB.
*
* @return
* @return a {@link Flux} emitting data chunks one by one. Please make sure to
* {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all
* {@link DataBuffer buffers} when done.
*/
public Flux<DataBuffer> getDownloadStream() {
return getDownloadStream(DEFAULT_CHUNK_SIZE);
}
/**
* Retrieve the download stream.
*
* @param chunkSize chunk size in bytes to use.
* @return a {@link Flux} emitting data chunks one by one. Please make sure to
* {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all
* {@link DataBuffer buffers} when done.
* @since 2.2.1
*/
public Flux<DataBuffer> getDownloadStream(int chunkSize) {
if (!exists()) {
return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription())));
}
return this.content;
return contentFunction.apply(chunkSize);
}
private void verifyExists() throws FileNotFoundException {

View File

@@ -49,6 +49,7 @@ import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher;
* {@link DefaultDataBufferFactory} to create {@link DataBuffer buffers}.
*
* @author Mark Paluch
* @author Nick Stolwijk
* @since 2.2
*/
public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements ReactiveGridFsOperations {
@@ -222,9 +223,11 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
return Mono.fromSupplier(() -> {
GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getObjectId());
return new ReactiveGridFsResource(file, chunkSize -> {
return new ReactiveGridFsResource(file, BinaryStreamAdapters.toPublisher(stream, dataBufferFactory));
GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId());
return BinaryStreamAdapters.toPublisher(stream, dataBufferFactory, chunkSize);
});
});
}

View File

@@ -36,11 +36,13 @@ import org.springframework.data.geo.Shape;
import org.springframework.data.mapping.PersistentPropertyPath;
import org.springframework.data.mapping.PropertyPath;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mongodb.core.geo.GeoJson;
import org.springframework.data.mongodb.core.index.GeoSpatialIndexType;
import org.springframework.data.mongodb.core.index.GeoSpatialIndexed;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.CriteriaDefinition;
import org.springframework.data.mongodb.core.query.MetricConversion;
import org.springframework.data.mongodb.core.query.MongoRegexCreator;
import org.springframework.data.mongodb.core.query.MongoRegexCreator.MatchMode;
import org.springframework.data.mongodb.core.query.Query;
@@ -235,8 +237,14 @@ class MongoQueryCreator extends AbstractQueryCreator<Query, Criteria> {
criteria.near(pointToUse);
}
criteria.maxDistance(it.getNormalizedValue());
minDistance.ifPresent(min -> criteria.minDistance(min.getNormalizedValue()));
if (pointToUse instanceof GeoJson) { // using GeoJson distance is in meters.
criteria.maxDistance(MetricConversion.getDistanceInMeters(it));
minDistance.map(MetricConversion::getDistanceInMeters).ifPresent(criteria::minDistance);
} else {
criteria.maxDistance(it.getNormalizedValue());
minDistance.map(Distance::getNormalizedValue).ifPresent(criteria::minDistance);
}
return criteria;

View File

@@ -16,13 +16,19 @@
package org.springframework.data.mongodb.util;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.StringJoiner;
import java.util.function.Function;
import java.util.stream.StreamSupport;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.json.JsonParseException;
import org.springframework.core.convert.converter.Converter;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
@@ -30,6 +36,7 @@ import org.springframework.util.StringUtils;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.DBRef;
import com.mongodb.MongoClientSettings;
/**
* @author Christoph Strobl
@@ -147,4 +154,80 @@ public class BsonUtils {
return orElse.apply(source);
}
/**
* Serialize the given {@link Document} as Json applying default codecs if necessary.
*
* @param source
* @return
* @since 2.2.1
*/
@Nullable
public static String toJson(@Nullable Document source) {
if (source == null) {
return null;
}
try {
return source.toJson();
} catch (Exception e) {
return toJson((Object) source);
}
}
@Nullable
private static String toJson(@Nullable Object value) {
if (value == null) {
return null;
}
try {
return value instanceof Document
? ((Document) value).toJson(MongoClientSettings.getDefaultCodecRegistry().get(Document.class))
: serializeValue(value);
} catch (Exception e) {
if (value instanceof Collection) {
return toString((Collection<?>) value);
} else if (value instanceof Map) {
return toString((Map<?, ?>) value);
} else if (ObjectUtils.isArray(value)) {
return toString(Arrays.asList(ObjectUtils.toObjectArray(value)));
}
throw e instanceof JsonParseException ? (JsonParseException) e : new JsonParseException(e);
}
}
private static String serializeValue(@Nullable Object value) {
if (value == null) {
return "null";
}
String documentJson = new Document("toBeEncoded", value).toJson();
return documentJson.substring(documentJson.indexOf(':') + 1, documentJson.length() - 1).trim();
}
private static String toString(Map<?, ?> source) {
return iterableToDelimitedString(source.entrySet(), "{ ", " }",
entry -> String.format("\"%s\" : %s", entry.getKey(), toJson(entry.getValue())));
}
private static String toString(Collection<?> source) {
return iterableToDelimitedString(source, "[ ", " ]", BsonUtils::toJson);
}
private static <T> String iterableToDelimitedString(Iterable<T> source, String prefix, String suffix,
Converter<? super T, String> transformer) {
StringJoiner joiner = new StringJoiner(", ", prefix, suffix);
StreamSupport.stream(source.spliterator(), false).map(transformer::convert).forEach(joiner::add);
return joiner.toString();
}
}

View File

@@ -20,6 +20,7 @@ import static org.assertj.core.api.Assumptions.*;
import static org.springframework.data.mongodb.core.index.PartialIndexFilter.*;
import static org.springframework.data.mongodb.core.query.Criteria.*;
import org.bson.BsonDocument;
import org.bson.Document;
import org.junit.Before;
import org.junit.Test;
@@ -39,6 +40,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ObjectUtils;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.IndexOptions;
/**
* Integration tests for {@link DefaultIndexOperations}.
@@ -153,6 +155,21 @@ public class DefaultIndexOperationsIntegrationTests {
.isEqualTo(Document.parse("{ \"a_g_e\" : { \"$gte\" : 10 } }"));
}
@Test // DATAMONGO-2388
public void shouldReadIndexWithPartialFilterContainingDbRefCorrectly() {
BsonDocument partialFilter = BsonDocument.parse(
"{ \"the-ref\" : { \"$ref\" : \"other-collection\", \"$id\" : { \"$oid\" : \"59ce08baf264b906810fe8c5\"} } }");
IndexOptions indexOptions = new IndexOptions();
indexOptions.name("partial-with-dbref");
indexOptions.partialFilterExpression(partialFilter);
collection.createIndex(BsonDocument.parse("{ \"key-1\" : 1, \"key-2\": 1}"), indexOptions);
IndexInfo info = findAndReturnIndexInfo(indexOps.getIndexInfo(), "partial-with-dbref");
assertThat(BsonDocument.parse(info.getPartialFilterExpression())).isEqualTo(partialFilter);
}
@Test // DATAMONGO-1518
public void shouldCreateIndexWithCollationCorrectly() {

View File

@@ -890,6 +890,17 @@ public class QueryMapperUnitTests {
assertThat(target).isEqualTo(new org.bson.Document("_id", "id-1"));
}
@Test // DATAMONGO-2394
public void leavesDistanceUntouchedWhenUsingGeoJson() {
Query query = query(where("geoJsonPoint").near(new GeoJsonPoint(27.987901, 86.9165379)).maxDistance(1000));
org.bson.Document document = mapper.getMappedObject(query.getQueryObject(),
context.getPersistentEntity(ClassWithGeoTypes.class));
assertThat(document).containsEntry("geoJsonPoint.$near.$geometry.type", "Point");
assertThat(document).containsEntry("geoJsonPoint.$near.$maxDistance", 1000.0D);
}
@Document
public class Foo {
@Id private ObjectId id;

View File

@@ -25,6 +25,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.junit.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
@@ -49,7 +50,7 @@ public class BinaryStreamAdaptersUnitTests {
byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
Flux<DataBuffer> dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory());
Flux<DataBuffer> dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory(), 256);
DataBufferUtils.join(dataBuffers) //
.as(StepVerifier::create) //

View File

@@ -44,7 +44,7 @@ public class DataBufferPublisherAdapterUnitTests {
when(asyncInput.read(any())).thenReturn(Mono.just(1), Mono.error(new IllegalStateException()));
when(asyncInput.close()).thenReturn(Mono.empty());
Flux<DataBuffer> binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory);
Flux<DataBuffer> binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory, 256);
StepVerifier.create(binaryStream, 0) //
.thenRequest(1) //

View File

@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import org.bson.BsonObjectId;
import org.bson.Document;
@@ -39,6 +40,7 @@ import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@@ -47,6 +49,8 @@ import org.springframework.util.StreamUtils;
import com.mongodb.MongoGridFSException;
import com.mongodb.client.gridfs.GridFSFindIterable;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSInputFile;
/**
* Integration tests for {@link GridFsTemplate}.
@@ -65,6 +69,7 @@ public class GridFsTemplateIntegrationTests {
Resource resource = new ClassPathResource("gridfs/gridfs.xml");
@Autowired GridFsOperations operations;
@Autowired SimpleMongoDbFactory mongoClient;
@Before
public void setUp() {
@@ -76,24 +81,42 @@ public class GridFsTemplateIntegrationTests {
ObjectId reference = operations.store(resource.getInputStream(), "foo.xml");
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
GridFSFindIterable result = operations.find(query(where("_id").is(reference)));
result.into(files);
assertThat(files.size()).isEqualTo(1);
assertThat(files).hasSize(1);
assertThat(((BsonObjectId) files.get(0).getId()).getValue()).isEqualTo(reference);
}
@Test // DATAMONGO-2392
public void storesAndFindsByUUID() throws IOException {
UUID uuid = UUID.randomUUID();
GridFS fs = new GridFS(mongoClient.getLegacyDb());
GridFSInputFile in = fs.createFile(resource.getInputStream(), "gridfs.xml");
in.put("_id", uuid);
in.put("contentType", "application/octet-stream");
in.save();
GridFSFile file = operations.findOne(query(where("_id").is(uuid)));
GridFsResource resource = operations.getResource(file);
assertThat(resource.exists()).isTrue();
}
@Test // DATAMONGO-6
public void writesMetadataCorrectly() throws IOException {
Document metadata = new Document("key", "value");
ObjectId reference = operations.store(resource.getInputStream(), "foo.xml", metadata);
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
GridFSFindIterable result = operations.find(query(whereMetaData("key").is("value")));
result.into(files);
assertThat(files.size()).isEqualTo(1);
assertThat(files).hasSize(1);
assertThat(((BsonObjectId) files.get(0).getId()).getValue()).isEqualTo(reference);
}
@@ -105,11 +128,11 @@ public class GridFsTemplateIntegrationTests {
ObjectId reference = operations.store(resource.getInputStream(), "foo.xml", metadata);
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
GridFSFindIterable result = operations.find(query(whereFilename().is("foo.xml")));
result.into(files);
assertThat(files.size()).isEqualTo(1);
assertThat(files).hasSize(1);
assertThat(((BsonObjectId) files.get(0).getId()).getValue()).isEqualTo(reference);
}
@@ -120,7 +143,7 @@ public class GridFsTemplateIntegrationTests {
GridFsResource[] resources = operations.getResources("*.xml");
assertThat(resources.length).isEqualTo(1);
assertThat(resources).hasSize(1);
assertThat(((BsonObjectId) resources[0].getId()).getValue()).isEqualTo(reference);
assertThat(resources[0].contentLength()).isEqualTo(resource.contentLength());
}
@@ -131,7 +154,7 @@ public class GridFsTemplateIntegrationTests {
ObjectId reference = operations.store(resource.getInputStream(), "foo.xml");
GridFsResource[] resources = operations.getResources("foo.xml");
assertThat(resources.length).isEqualTo(1);
assertThat(resources).hasSize(1);
assertThat(((BsonObjectId) resources[0].getId()).getValue()).isEqualTo(reference);
assertThat(resources[0].contentLength()).isEqualTo(resource.contentLength());
}
@@ -141,11 +164,11 @@ public class GridFsTemplateIntegrationTests {
ObjectId reference = operations.store(resource.getInputStream(), "foo2.xml", "application/xml");
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
GridFSFindIterable result = operations.find(query(whereContentType().is("application/xml")));
result.into(files);
assertThat(files.size()).isEqualTo(1);
assertThat(files).hasSize(1);
assertThat(((BsonObjectId) files.get(0).getId()).getValue()).isEqualTo(reference);
}
@@ -158,7 +181,7 @@ public class GridFsTemplateIntegrationTests {
Query query = new Query().with(Sort.by(Direction.ASC, "filename"));
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
GridFSFindIterable result = operations.find(query);
result.into(files);
@@ -194,7 +217,7 @@ public class GridFsTemplateIntegrationTests {
Document metadata = new Document("key", "value");
ObjectId reference = operations.store(resource.getInputStream(), "foobar", metadata);
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
GridFSFindIterable result = operations.find(query(whereMetaData("key").is("value")));
result.into(files);
@@ -208,7 +231,7 @@ public class GridFsTemplateIntegrationTests {
metadata.version = "1.0";
ObjectId reference = operations.store(resource.getInputStream(), "foobar", metadata);
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<com.mongodb.client.gridfs.model.GridFSFile>();
List<com.mongodb.client.gridfs.model.GridFSFile> files = new ArrayList<>();
GridFSFindIterable result = operations.find(query(whereMetaData("version").is("1.0")));
result.into(files);
@@ -229,7 +252,7 @@ public class GridFsTemplateIntegrationTests {
operations.store(resource.getInputStream(), "no-content-type", (String) null);
GridFsResource result = operations.getResource("no-content-type");
assertThatThrownBy(() -> result.getContentType()).isInstanceOf(MongoGridFSException.class);
assertThatThrownBy(result::getContentType).isInstanceOf(MongoGridFSException.class);
}
@Test // DATAMONGO-1813

View File

@@ -24,6 +24,8 @@ import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.bson.BsonObjectId;
import org.bson.Document;
@@ -31,7 +33,6 @@ import org.bson.types.ObjectId;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
@@ -39,11 +40,17 @@ import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.StreamUtils;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSInputFile;
import com.mongodb.internal.HexUtils;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper;
@@ -52,6 +59,7 @@ import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper;
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Nick Stolwijk
*/
@RunWith(SpringRunner.class)
@ContextConfiguration("classpath:gridfs/reactive-gridfs.xml")
@@ -60,6 +68,9 @@ public class ReactiveGridFsTemplateTests {
Resource resource = new ClassPathResource("gridfs/gridfs.xml");
@Autowired ReactiveGridFsOperations operations;
@Autowired SimpleMongoDbFactory mongoClient;
@Autowired ReactiveMongoDatabaseFactory dbFactory;
@Autowired MongoConverter mongoConverter;
@Before
public void setUp() {
@@ -86,6 +97,64 @@ public class ReactiveGridFsTemplateTests {
.verifyComplete();
}
@Test // DATAMONGO-1855
public void storesAndLoadsLargeFileCorrectly() {
ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000); // 1 mb
int i = 0;
while (buffer.remaining() != 0) {
buffer.put(HexUtils.toHex(new byte[] { (byte) (i++ % 16) }).getBytes());
}
buffer.flip();
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
ObjectId reference = operations.store(Flux.just(factory.wrap(buffer)), "large.txt").block();
buffer.clear();
// default chunk size
operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource)
.flatMapMany(ReactiveGridFsResource::getDownloadStream) //
.transform(DataBufferUtils::join) //
.as(StepVerifier::create) //
.consumeNextWith(dataBuffer -> {
assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining());
assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer);
}).verifyComplete();
// small chunk size
operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource)
.flatMapMany(reactiveGridFsResource -> reactiveGridFsResource.getDownloadStream(256)) //
.transform(DataBufferUtils::join) //
.as(StepVerifier::create) //
.consumeNextWith(dataBuffer -> {
assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining());
assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer);
}).verifyComplete();
}
@Test // DATAMONGO-2392
public void storesAndFindsByUUID() throws IOException {
UUID uuid = UUID.randomUUID();
GridFS fs = new GridFS(mongoClient.getLegacyDb());
GridFSInputFile in = fs.createFile(resource.getInputStream(), "gridfs.xml");
in.put("_id", uuid);
in.put("contentType", "application/octet-stream");
in.save();
operations.findOne(query(where("_id").is(uuid))).flatMap(operations::getResource)
.flatMapMany(ReactiveGridFsResource::getDownloadStream) //
.transform(DataBufferUtils::join) //
.doOnNext(DataBufferUtils::release).as(StepVerifier::create) //
.expectNextCount(1).verifyComplete();
}
@Test // DATAMONGO-1855
public void writesMetadataCorrectly() throws IOException {
@@ -148,7 +217,8 @@ public class ReactiveGridFsTemplateTests {
public void shouldEmitFirstEntryWhenFindFirstRetrievesMoreThanOneResult() throws IOException {
AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
AsyncInputStream upload2 = AsyncStreamHelper.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream());
AsyncInputStream upload2 = AsyncStreamHelper
.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream());
operations.store(upload1, "foo.xml", null, null).block();
operations.store(upload2, "foo2.xml", null, null).block();
@@ -159,8 +229,7 @@ public class ReactiveGridFsTemplateTests {
.assertNext(actual -> {
assertThat(actual.getGridFSFile()).isNotNull();
})
.verifyComplete();
}).verifyComplete();
}
@Test // DATAMONGO-2240
@@ -179,7 +248,8 @@ public class ReactiveGridFsTemplateTests {
public void shouldEmitErrorWhenFindOneRetrievesMoreThanOneResult() throws IOException {
AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
AsyncInputStream upload2 = AsyncStreamHelper.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream());
AsyncInputStream upload2 = AsyncStreamHelper
.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream());
operations.store(upload1, "foo.xml", null, null).block();
operations.store(upload2, "foo2.xml", null, null).block();

View File

@@ -652,6 +652,17 @@ public class MongoQueryCreatorUnitTests {
assertThat(creator.createQuery()).isEqualTo(query(where("age").gt(10).lt(11)));
}
@Test // DATAMONGO-2394
public void nearShouldUseMetricDistanceForGeoJsonTypes() {
GeoJsonPoint point = new GeoJsonPoint(27.987901, 86.9165379);
PartTree tree = new PartTree("findByLocationNear", User.class);
MongoQueryCreator creator = new MongoQueryCreator(tree,
getAccessor(converter, point, new Distance(1, Metrics.KILOMETERS)), context);
assertThat(creator.createQuery()).isEqualTo(query(where("location").nearSphere(point).maxDistance(1000.0D)));
}
interface PersonRepository extends Repository<Person, Long> {
List<Person> findByLocationNearAndFirstname(Point location, Distance maxDistance, String firstname);

View File

@@ -3,13 +3,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/util https://www.springframework.org/schema/util/spring-util.xsd">
<bean id="mongoClient" class="org.springframework.data.mongodb.core.ReactiveMongoClientFactoryBean">
<bean id="reactiveMongoClient" class="org.springframework.data.mongodb.core.ReactiveMongoClientFactoryBean">
<property name="host" value="127.0.0.1"/>
<property name="port" value="27017"/>
</bean>
<bean id="reactiveMongoDbFactory" class="org.springframework.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
<constructor-arg name="mongoClient" ref="mongoClient"/>
<constructor-arg name="mongoClient" ref="reactiveMongoClient" type="com.mongodb.reactivestreams.client.MongoClient"/>
<constructor-arg name="databaseName" value="reactive_gridfs"/>
</bean>
@@ -27,4 +27,13 @@
<constructor-arg ref="converter"/>
</bean>
<bean id="mongoClient" class="org.springframework.data.mongodb.core.MongoClientFactoryBean">
<property name="host" value="127.0.0.1"/>
<property name="port" value="27017"/>
</bean>
<bean id="mongoDbFactory" class="org.springframework.data.mongodb.core.SimpleMongoDbFactory">
<constructor-arg name="mongoClient" ref="mongoClient"/>
<constructor-arg name="databaseName" value="reactive_gridfs"/>
</bean>
</beans>

View File

@@ -1,6 +1,22 @@
Spring Data MongoDB Changelog
=============================
Changes in version 2.2.1.RELEASE (2019-11-04)
---------------------------------------------
* DATAMONGO-2399 - Upgrade to mongo-java-driver 3.11.1.
* DATAMONGO-2394 - nearSphere query wrongly generated with radian parameter instead of meters.
* DATAMONGO-2393 - Reading large file from ReactiveGridFsTemplate causes a stackoverflow and the code to hang.
* DATAMONGO-2392 - Reading GridFS files written with old api and custom id fails on ReactiveGridFsTemplate.
* DATAMONGO-2388 - IndexOperations.getIndexInfo() fails for index that has partialFilterExpression containing DBRef.
* DATAMONGO-2382 - Release 2.2.1 (Moore SR1).
Changes in version 2.1.12.RELEASE (2019-11-04)
----------------------------------------------
* DATAMONGO-2388 - IndexOperations.getIndexInfo() fails for index that has partialFilterExpression containing DBRef.
* DATAMONGO-2381 - Release 2.1.12 (Lovelace SR12).
Changes in version 2.2.0.RELEASE (2019-09-30)
---------------------------------------------
* DATAMONGO-2380 - Remove @ExperimentalCoroutinesApi annotations.

View File

@@ -1,4 +1,4 @@
Spring Data MongoDB 2.2 GA
Spring Data MongoDB 2.2.1
Copyright (c) [2010-2019] Pivotal Software, Inc.
This product is licensed to you under the Apache License, Version 2.0 (the "License").