DATAMONGO-2080 - Polishing.
Remove obsolete classes, update Javadoc and fix tests calling all() instead of tail(). Original Pull Request: #608
This commit is contained in:
@@ -86,16 +86,18 @@ public interface ReactiveFindOperation {
|
||||
Flux<T> all();
|
||||
|
||||
/**
|
||||
* Get all matching elements using a tailable cursor.
|
||||
* Get all matching elements using a {@link com.mongodb.CursorType#TailableAwait tailable cursor}. The stream will
|
||||
* not be completed unless the {@link org.reactivestreams.Subscription} is
|
||||
* {@link org.reactivestreams.Subscription#cancel() canceled}.
|
||||
* <p />
|
||||
* However, the stream may become dead, or invalid, if either the query returns no match or the cursor returns the
|
||||
* document at the "end" of the collection and then the application deletes that document.
|
||||
* <p />
|
||||
* The stream may become dead, or invalid, if either the query returns no match or the cursor returns the document
|
||||
* at the "end" of the collection and then the application deletes that document.
|
||||
* <p/>
|
||||
* A stream that is no longer in use must be {@link reactor.core.Disposable#dispose()} disposed} otherwise the
|
||||
* streams will linger and exhaust resources. <br/>
|
||||
* <strong>NOTE:</strong> Requires a capped collection.
|
||||
*
|
||||
* @return never {@literal null}.
|
||||
* @return the {@link Flux} emitting converted objects.
|
||||
* @since 2.1
|
||||
*/
|
||||
Flux<T> tail();
|
||||
|
||||
@@ -119,7 +119,7 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
|
||||
|
||||
String collection = method.getEntityInformation().getCollectionName();
|
||||
|
||||
ReactiveMongoQueryExecution execution = getExecution(query, parameterAccessor,
|
||||
ReactiveMongoQueryExecution execution = getExecution(parameterAccessor,
|
||||
new ResultProcessingConverter(processor, operations, instantiators), find);
|
||||
|
||||
return execution.execute(query, processor.getReturnedType().getDomainType(), collection);
|
||||
@@ -128,12 +128,11 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
|
||||
/**
|
||||
* Returns the execution instance to use.
|
||||
*
|
||||
* @param query must not be {@literal null}.
|
||||
* @param accessor must not be {@literal null}.
|
||||
* @param resultProcessing must not be {@literal null}.
|
||||
* @return
|
||||
*/
|
||||
private ReactiveMongoQueryExecution getExecution(Query query, MongoParameterAccessor accessor,
|
||||
private ReactiveMongoQueryExecution getExecution(MongoParameterAccessor accessor,
|
||||
Converter<Object, Object> resultProcessing, FindWithQuery<?> operation) {
|
||||
return new ResultProcessingExecution(getExecutionToWrap(accessor, operation), resultProcessing);
|
||||
}
|
||||
|
||||
@@ -51,23 +51,6 @@ interface ReactiveMongoQueryExecution {
|
||||
|
||||
Object execute(Query query, Class<?> type, String collection);
|
||||
|
||||
/**
|
||||
* {@link ReactiveMongoQueryExecution} for collection returning queries using tailable cursors.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
final class TailExecution implements ReactiveMongoQueryExecution {
|
||||
|
||||
private final @NonNull ReactiveMongoOperations operations;
|
||||
private final Pageable pageable;
|
||||
|
||||
@Override
|
||||
public Object execute(Query query, Class<?> type, String collection) {
|
||||
return operations.tail(query.with(pageable), type, collection);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link MongoQueryExecution} to execute geo-near queries.
|
||||
*
|
||||
|
||||
@@ -71,7 +71,7 @@ public class ReactiveFindOperationSupportTests {
|
||||
public void setUp() {
|
||||
|
||||
blocking = new MongoTemplate(new SimpleMongoDbFactory(new MongoClient(), "ExecutableFindOperationSupportTests"));
|
||||
blocking.dropCollection(STAR_WARS);
|
||||
recreateCollection(STAR_WARS, false);
|
||||
|
||||
insertObjects();
|
||||
|
||||
@@ -79,6 +79,7 @@ public class ReactiveFindOperationSupportTests {
|
||||
}
|
||||
|
||||
void insertObjects() {
|
||||
|
||||
han = new Person();
|
||||
han.firstname = "han";
|
||||
han.lastname = "solo";
|
||||
@@ -93,6 +94,18 @@ public class ReactiveFindOperationSupportTests {
|
||||
blocking.save(luke);
|
||||
}
|
||||
|
||||
void recreateCollection(String collectionName, boolean capped) {
|
||||
|
||||
blocking.dropCollection(STAR_WARS);
|
||||
|
||||
CollectionOptions options = CollectionOptions.empty();
|
||||
if (capped) {
|
||||
options = options.capped().size(1024 * 1024);
|
||||
}
|
||||
|
||||
blocking.createCollection(STAR_WARS, options);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class) // DATAMONGO-1719
|
||||
public void domainTypeIsRequired() {
|
||||
template.query(null);
|
||||
@@ -304,8 +317,7 @@ public class ReactiveFindOperationSupportTests {
|
||||
@Test // DATAMONGO-2080
|
||||
public void tail() throws InterruptedException {
|
||||
|
||||
blocking.dropCollection(STAR_WARS);
|
||||
blocking.createCollection(STAR_WARS, CollectionOptions.empty().capped().size(1024 * 1024));
|
||||
recreateCollection(STAR_WARS, true);
|
||||
insertObjects();
|
||||
|
||||
BlockingQueue<Person> collector = new LinkedBlockingQueue<>();
|
||||
@@ -325,7 +337,7 @@ public class ReactiveFindOperationSupportTests {
|
||||
|
||||
blocking.save(chewbacca);
|
||||
|
||||
assertThat(collector.poll(10, TimeUnit.SECONDS)).isEqualTo(chewbacca);
|
||||
assertThat(collector.poll(1, TimeUnit.SECONDS)).isEqualTo(chewbacca);
|
||||
|
||||
subscription.dispose();
|
||||
}
|
||||
@@ -333,12 +345,11 @@ public class ReactiveFindOperationSupportTests {
|
||||
@Test // DATAMONGO-2080
|
||||
public void tailWithProjection() {
|
||||
|
||||
blocking.dropCollection(STAR_WARS);
|
||||
blocking.createCollection(STAR_WARS, CollectionOptions.empty().capped().size(1024 * 1024));
|
||||
recreateCollection(STAR_WARS, true);
|
||||
insertObjects();
|
||||
|
||||
StepVerifier
|
||||
.create(template.query(Person.class).as(Jedi.class).matching(query(where("firstname").is("luke"))).tail())
|
||||
template.query(Person.class).as(Jedi.class).matching(query(where("firstname").is("luke"))).tail()
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(it -> assertThat(it).isInstanceOf(Jedi.class)) //
|
||||
.thenCancel() //
|
||||
.verify();
|
||||
@@ -347,12 +358,11 @@ public class ReactiveFindOperationSupportTests {
|
||||
@Test // DATAMONGO-2080
|
||||
public void tailWithClosedInterfaceProjection() {
|
||||
|
||||
blocking.dropCollection(STAR_WARS);
|
||||
blocking.createCollection(STAR_WARS, CollectionOptions.empty().capped().size(1024 * 1024));
|
||||
recreateCollection(STAR_WARS, true);
|
||||
insertObjects();
|
||||
|
||||
StepVerifier.create(
|
||||
template.query(Person.class).as(PersonProjection.class).matching(query(where("firstname").is("luke"))).all())
|
||||
template.query(Person.class).as(PersonProjection.class).matching(query(where("firstname").is("luke"))).tail()
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(it -> {
|
||||
|
||||
assertThat(it).isInstanceOf(PersonProjection.class);
|
||||
@@ -365,12 +375,12 @@ public class ReactiveFindOperationSupportTests {
|
||||
@Test // DATAMONGO-2080
|
||||
public void tailWithOpenInterfaceProjection() {
|
||||
|
||||
blocking.dropCollection(STAR_WARS);
|
||||
blocking.createCollection(STAR_WARS, CollectionOptions.empty().capped().size(1024 * 1024));
|
||||
recreateCollection(STAR_WARS, true);
|
||||
insertObjects();
|
||||
|
||||
StepVerifier.create(template.query(Person.class).as(PersonSpELProjection.class)
|
||||
.matching(query(where("firstname").is("luke"))).tail()).consumeNextWith(it -> {
|
||||
template.query(Person.class).as(PersonSpELProjection.class).matching(query(where("firstname").is("luke"))).tail()
|
||||
.as(StepVerifier::create) //
|
||||
.consumeNextWith(it -> {
|
||||
|
||||
assertThat(it).isInstanceOf(PersonSpELProjection.class);
|
||||
assertThat(it.getName()).isEqualTo("luke");
|
||||
|
||||
@@ -231,14 +231,14 @@ public class ReactiveMongoRepositoryTests implements BeanClassLoaderAware, BeanF
|
||||
@Test // DATAMONGO-2080
|
||||
public void shouldUseTailableCursorWithDtoProjection() {
|
||||
|
||||
StepVerifier.create(template.dropCollection(Capped.class) //
|
||||
template.dropCollection(Capped.class) //
|
||||
.then(template.createCollection(Capped.class, //
|
||||
CollectionOptions.empty().size(1000).maxDocuments(100).capped()))) //
|
||||
.expectNextCount(1) //
|
||||
CollectionOptions.empty().size(1000).maxDocuments(100).capped())) //
|
||||
.as(StepVerifier::create).expectNextCount(1) //
|
||||
.verifyComplete();
|
||||
|
||||
StepVerifier.create(template.insert(new Capped("value", Math.random()))).expectNextCount(1).verifyComplete();
|
||||
StepVerifier.create(cappedRepository.findDtoProjectionByKey("value")).expectNextCount(1).thenCancel().verify();
|
||||
template.insert(new Capped("value", Math.random())).as(StepVerifier::create).expectNextCount(1).verifyComplete();
|
||||
cappedRepository.findDtoProjectionByKey("value").as(StepVerifier::create).expectNextCount(1).thenCancel().verify();
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1444
|
||||
|
||||
Reference in New Issue
Block a user