DATAMONGO-1667 - Rename @InfiniteStream to @Tailable.
Rename InfiniteStream annotation to Tailable to reflect the related MongoDB approach used for repository query methods returning an infinite stream. InfiniteStream is the high-level concept that is achieved by using tailable cursors. Original Pull Request: #458
This commit is contained in:
committed by
Christoph Strobl
parent
42672a6df9
commit
b22fd056aa
@@ -24,10 +24,10 @@ import java.lang.annotation.Target;
|
||||
import org.springframework.data.annotation.QueryAnnotation;
|
||||
|
||||
/**
|
||||
* Annotation to declare an infinite stream using repository query methods. An infinite stream uses MongoDB's
|
||||
* {@link com.mongodb.CursorType#TailableAwait tailable} cursors to retrieve data from a capped collection and stream
|
||||
* data as it is inserted into the collection. An infinite stream can only be used with streams that emit more than one
|
||||
* element, such as {@link reactor.core.publisher.Flux} or {@link rx.Observable}.
|
||||
* Annotation to declare an infinite stream using MongoDB's {@link com.mongodb.CursorType#TailableAwait tailable}
|
||||
* cursors. An infinite stream can only be used with capped collections. Objects are emitted through the stream as data
|
||||
* is inserted into the collection. An infinite stream can only be used with streams that emit more than one element,
|
||||
* such as {@link reactor.core.publisher.Flux}.
|
||||
* <p>
|
||||
* The stream may become dead, or invalid, if either the query returns no match or the cursor returns the document at
|
||||
* the "end" of the collection and then the application deletes that document.
|
||||
@@ -37,11 +37,12 @@ import org.springframework.data.annotation.QueryAnnotation;
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @see <a href="https://docs.mongodb.com/manual/core/tailable-cursors/">Tailable Cursors</a>
|
||||
* @since 2.0
|
||||
*/
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
|
||||
@Documented
|
||||
@QueryAnnotation
|
||||
public @interface InfiniteStream {
|
||||
public @interface Tailable {
|
||||
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -130,7 +130,7 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
|
||||
return new DeleteExecution(operations, method);
|
||||
} else if (method.isGeoNearQuery()) {
|
||||
return new GeoNearExecution(operations, accessor, method.getReturnType());
|
||||
} else if (isInfiniteStream(method)) {
|
||||
} else if (isTailable(method)) {
|
||||
return new TailExecution(operations, accessor.getPageable());
|
||||
} else if (method.isCollectionQuery()) {
|
||||
return new CollectionExecution(operations, accessor.getPageable());
|
||||
@@ -139,8 +139,8 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isInfiniteStream(MongoQueryMethod method) {
|
||||
return method.getInfiniteStreamAnnotation() != null;
|
||||
private boolean isTailable(MongoQueryMethod method) {
|
||||
return method.getTailableAnnotation() != null;
|
||||
}
|
||||
|
||||
Query applyQueryMetaAttributesWhenPresent(Query query) {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2011-2016 the original author or authors.
|
||||
* Copyright 2011-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -29,9 +29,9 @@ import org.springframework.data.geo.GeoResults;
|
||||
import org.springframework.data.mapping.context.MappingContext;
|
||||
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
|
||||
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
|
||||
import org.springframework.data.mongodb.repository.InfiniteStream;
|
||||
import org.springframework.data.mongodb.repository.Meta;
|
||||
import org.springframework.data.mongodb.repository.Query;
|
||||
import org.springframework.data.mongodb.repository.Tailable;
|
||||
import org.springframework.data.projection.ProjectionFactory;
|
||||
import org.springframework.data.repository.core.RepositoryMetadata;
|
||||
import org.springframework.data.repository.query.QueryMethod;
|
||||
@@ -44,7 +44,7 @@ import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Mongo specific implementation of {@link QueryMethod}.
|
||||
*
|
||||
*
|
||||
* @author Oliver Gierke
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
@@ -61,7 +61,7 @@ public class MongoQueryMethod extends QueryMethod {
|
||||
|
||||
/**
|
||||
* Creates a new {@link MongoQueryMethod} from the given {@link Method}.
|
||||
*
|
||||
*
|
||||
* @param method must not be {@literal null}.
|
||||
* @param metadata must not be {@literal null}.
|
||||
* @param projectionFactory must not be {@literal null}.
|
||||
@@ -89,7 +89,7 @@ public class MongoQueryMethod extends QueryMethod {
|
||||
|
||||
/**
|
||||
* Returns whether the method has an annotated query.
|
||||
*
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public boolean hasAnnotatedQuery() {
|
||||
@@ -99,7 +99,7 @@ public class MongoQueryMethod extends QueryMethod {
|
||||
/**
|
||||
* Returns the query string declared in a {@link Query} annotation or {@literal null} if neither the annotation found
|
||||
* nor the attribute was specified.
|
||||
*
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
String getAnnotatedQuery() {
|
||||
@@ -110,7 +110,7 @@ public class MongoQueryMethod extends QueryMethod {
|
||||
|
||||
/**
|
||||
* Returns the field specification to be used for the query.
|
||||
*
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
String getFieldSpecification() {
|
||||
@@ -119,7 +119,7 @@ public class MongoQueryMethod extends QueryMethod {
|
||||
return StringUtils.hasText(value) ? value : null;
|
||||
}
|
||||
|
||||
/*
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.data.repository.query.QueryMethod#getEntityInformation()
|
||||
*/
|
||||
@@ -165,7 +165,7 @@ public class MongoQueryMethod extends QueryMethod {
|
||||
|
||||
/**
|
||||
* Returns whether the query is a geo near query.
|
||||
*
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public boolean isGeoNearQuery() {
|
||||
@@ -192,7 +192,7 @@ public class MongoQueryMethod extends QueryMethod {
|
||||
|
||||
/**
|
||||
* Returns the {@link Query} annotation that is applied to the method or {@code null} if none available.
|
||||
*
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
Query getQueryAnnotation() {
|
||||
@@ -213,7 +213,7 @@ public class MongoQueryMethod extends QueryMethod {
|
||||
|
||||
/**
|
||||
* Returns the {@link Meta} annotation that is applied to the method or {@code null} if not available.
|
||||
*
|
||||
*
|
||||
* @return
|
||||
* @since 1.6
|
||||
*/
|
||||
@@ -222,18 +222,18 @@ public class MongoQueryMethod extends QueryMethod {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link InfiniteStream} annotation that is applied to the method or {@code null} if not available.
|
||||
* Returns the {@link Tailable} annotation that is applied to the method or {@code null} if not available.
|
||||
*
|
||||
* @return
|
||||
* @since 2.0
|
||||
*/
|
||||
InfiniteStream getInfiniteStreamAnnotation() {
|
||||
return AnnotatedElementUtils.findMergedAnnotation(method, InfiniteStream.class);
|
||||
Tailable getTailableAnnotation() {
|
||||
return AnnotatedElementUtils.findMergedAnnotation(method, Tailable.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link org.springframework.data.mongodb.core.query.Meta} attributes to be applied.
|
||||
*
|
||||
*
|
||||
* @return never {@literal null}.
|
||||
* @since 1.6
|
||||
*/
|
||||
|
||||
@@ -170,7 +170,7 @@ public class ReactiveMongoRepositoryTests implements BeanClassLoaderAware, BeanF
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1444
|
||||
public void shouldUseInfiniteStream() throws Exception {
|
||||
public void shouldUseTailableCursor() throws Exception {
|
||||
|
||||
StepVerifier
|
||||
.create(template.dropCollection(Capped.class) //
|
||||
@@ -195,7 +195,7 @@ public class ReactiveMongoRepositoryTests implements BeanClassLoaderAware, BeanF
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-1444
|
||||
public void shouldUseInfiniteStreamWithProjection() throws Exception {
|
||||
public void shouldUseTailableCursorWithProjection() throws Exception {
|
||||
|
||||
StepVerifier
|
||||
.create(template.dropCollection(Capped.class) //
|
||||
@@ -329,10 +329,10 @@ public class ReactiveMongoRepositoryTests implements BeanClassLoaderAware, BeanF
|
||||
|
||||
interface ReactiveCappedCollectionRepository extends Repository<Capped, String> {
|
||||
|
||||
@InfiniteStream
|
||||
@Tailable
|
||||
Flux<Capped> findByKey(String key);
|
||||
|
||||
@InfiniteStream
|
||||
@Tailable
|
||||
Flux<CappedProjection> findProjectionByKey(String key);
|
||||
}
|
||||
|
||||
|
||||
@@ -188,25 +188,25 @@ public interface PersonRepository extends ReactiveMongoRepository<Person, String
|
||||
----
|
||||
|
||||
[[mongo.reactive.repositories.infinite-streams]]
|
||||
== Infinite Streams
|
||||
== Infinite Streams with Tailable Cursors
|
||||
|
||||
By default, MongoDB will automatically close a cursor when the client has exhausted all results in the cursor. Closing a cursors turns a Stream into a finite stream. However, for capped collections you may use a https://docs.mongodb.com/manual/core/tailable-cursors/[Tailable Cursor] that remains open after the client exhausts the results in the initial cursor. Using Tailable Cursors with a reactive approach allows construction of infinite streams. A Tailable Cursor remains open until it's closed. It emits data as data arrives in a capped collection. Using Tailable Cursors with Collections is not possible as its result would never complete.
|
||||
By default, MongoDB will automatically close a cursor when the client has exhausted all results in the cursor. Closing a cursors turns a Stream into a finite stream. However, for capped collections you may use a https://docs.mongodb.com/manual/core/tailable-cursors/[Tailable Cursor] that remains open after the client exhausts the results in the initial cursor. Using tailable Cursors with a reactive approach allows construction of infinite streams. A tailable Cursor remains open until it's closed. It emits data as data arrives in a capped collection. Using Tailable Cursors with Collections is not possible as its result would never complete.
|
||||
|
||||
Spring Data MongoDB Reactive Repository support supports infinite streams by annotating a query method with `@InfiniteStream`. This works for methods returning `Flux` or `Observable` wrapper types.
|
||||
Spring Data MongoDB Reactive Repository support supports infinite streams by annotating a query method with `@TailableCursor`. This works for methods returning `Flux` or `Observable` wrapper types.
|
||||
|
||||
[source,java]
|
||||
----
|
||||
|
||||
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {
|
||||
|
||||
@InfiniteStream
|
||||
@Tailable
|
||||
Flux<Person> findByFirstname(String firstname);
|
||||
|
||||
}
|
||||
|
||||
Flux<Person> stream = repository.findByFirstname("Joe");
|
||||
|
||||
Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();
|
||||
Disposable subscription = stream.doOnNext(System.out::println).subscribe();
|
||||
|
||||
// …
|
||||
|
||||
|
||||
Reference in New Issue
Block a user