diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index 388375a2a..0e7d7aa0c 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -111,6 +111,13 @@ true + + io.projectreactor.addons + reactor-test + ${reactor} + true + + io.reactivex rxjava diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/InfiniteStream.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/InfiniteStream.java index 1448450c7..ceda10437 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/InfiniteStream.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/InfiniteStream.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.springframework.data.mongodb.repository; import java.lang.annotation.Documented; @@ -24,8 +23,6 @@ import java.lang.annotation.Target; import org.springframework.data.annotation.QueryAnnotation; -import reactor.core.Cancellation; - /** * Annotation to declare an infinite stream using repository query methods. An infinite stream uses MongoDB's * {@link com.mongodb.CursorType#TailableAwait tailable} cursors to retrieve data from a capped collection and stream @@ -35,8 +32,8 @@ import reactor.core.Cancellation; * The stream may become dead, or invalid, if either the query returns no match or the cursor returns the document at * the "end" of the collection and then the application deletes that document. *

- * A stream that is no longer in use must be {@link Cancellation#dispose()} disposed} otherwise the streams will linger - * and exhaust resources. + * A stream that is no longer in use must be {@link reactor.core.Disposable#dispose()} disposed} otherwise the streams + * will linger and exhaust resources. * * @author Mark Paluch * @see Tailable Cursors diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java index 36d690c87..c06121319 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java @@ -21,7 +21,7 @@ import static org.springframework.data.mongodb.core.query.Criteria.*; import static org.springframework.data.mongodb.core.query.Query.*; import lombok.Data; -import reactor.core.Cancellation; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -812,12 +812,12 @@ public class ReactiveMongoTemplateTests { Flux capped = template.tail(null, Document.class, "capped"); - Cancellation cancellation = capped.doOnNext(documents::add).subscribe(); + Disposable disposable = capped.doOnNext(documents::add).subscribe(); assertThat(documents.poll(5, TimeUnit.SECONDS), is(notNullValue())); assertThat(documents.isEmpty(), is(true)); - cancellation.dispose(); + disposable.dispose(); } @Test // DATAMONGO-1444 @@ -834,7 +834,7 @@ public class ReactiveMongoTemplateTests { Flux capped = template.tail(null, Document.class, "capped"); - Cancellation cancellation = capped.doOnNext(documents::add).subscribe(); + Disposable disposable = capped.doOnNext(documents::add).subscribe(); assertThat(documents.poll(5, TimeUnit.SECONDS), is(notNullValue())); assertThat(documents.isEmpty(), is(true)); @@ -845,7 +845,7 @@ public class ReactiveMongoTemplateTests { assertThat(documents.poll(5, TimeUnit.SECONDS), is(notNullValue())); - cancellation.dispose(); + disposable.dispose(); StepVerifier.create(template.insert(new Document("random", Math.random()).append("key", "value"), "capped")) // .expectNextCount(1) // diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java index e87f8d90d..602b1c2fe 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/ReactiveMongoRepositoryTests.java @@ -20,7 +20,7 @@ import static org.junit.Assert.*; import static org.springframework.data.domain.Sort.Direction.*; import lombok.NoArgsConstructor; -import reactor.core.Cancellation; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -183,7 +183,7 @@ public class ReactiveMongoRepositoryTests implements BeanClassLoaderAware, BeanF BlockingQueue documents = new LinkedBlockingDeque<>(100); - Cancellation cancellation = cappedRepository.findByKey("value").doOnNext(documents::add).subscribe(); + Disposable disposable = cappedRepository.findByKey("value").doOnNext(documents::add).subscribe(); assertThat(documents.poll(5, TimeUnit.SECONDS), is(notNullValue())); @@ -191,7 +191,7 @@ public class ReactiveMongoRepositoryTests implements BeanClassLoaderAware, BeanF assertThat(documents.poll(5, TimeUnit.SECONDS), is(notNullValue())); assertThat(documents.isEmpty(), is(true)); - cancellation.dispose(); + disposable.dispose(); } @Test // DATAMONGO-1444 @@ -208,7 +208,7 @@ public class ReactiveMongoRepositoryTests implements BeanClassLoaderAware, BeanF BlockingQueue documents = new LinkedBlockingDeque<>(100); - Cancellation cancellation = cappedRepository.findProjectionByKey("value").doOnNext(documents::add).subscribe(); + Disposable disposable = cappedRepository.findProjectionByKey("value").doOnNext(documents::add).subscribe(); CappedProjection projection1 = documents.poll(5, TimeUnit.SECONDS); assertThat(projection1, is(notNullValue())); @@ -222,7 +222,7 @@ public class ReactiveMongoRepositoryTests implements BeanClassLoaderAware, BeanF assertThat(documents.isEmpty(), is(true)); - cancellation.dispose(); + disposable.dispose(); } @Test // DATAMONGO-1444 diff --git a/src/main/asciidoc/reference/reactive-mongo-repositories.adoc b/src/main/asciidoc/reference/reactive-mongo-repositories.adoc index b1b046b64..c33b6511a 100644 --- a/src/main/asciidoc/reference/reactive-mongo-repositories.adoc +++ b/src/main/asciidoc/reference/reactive-mongo-repositories.adoc @@ -206,10 +206,10 @@ public interface PersonRepository extends ReactiveMongoRepository stream = repository.findByFirstname("Joe"); -Cancellation cancellation = stream.doOnNext(person -> System.out.println(person)).subscribe(); +Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe(); // … // Later: Dispose the stream -cancellation.dispose(); +subscription.dispose(); ---- diff --git a/src/main/asciidoc/reference/reactive-mongodb.adoc b/src/main/asciidoc/reference/reactive-mongodb.adoc index 76ea7e168..2e9a3720d 100644 --- a/src/main/asciidoc/reference/reactive-mongodb.adoc +++ b/src/main/asciidoc/reference/reactive-mongodb.adoc @@ -466,12 +466,12 @@ By default, MongoDB will automatically close a cursor when the client has exhaus ---- Flux stream = template.tail(query(where("name").is("Joe")), Person.class); -Cancellation cancellation = stream.doOnNext(person -> System.out.println(person)).subscribe(); +Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe(); // … // Later: Dispose the stream -cancellation.dispose(); +subscription.dispose(); ----