DATAMONGO-1444 - Adopt changes in Spring Data Commons.

- Adopt RxJava to RxJava1 repository interface renaming.
- Remove ReactiveChunk, Slice and Page.
- Update documentation.
- Prevent sliced/paged query execution.
This commit is contained in:
Christoph Strobl
2016-11-07 10:18:54 +01:00
committed by Oliver Gierke
parent e0f371f648
commit df859d0f3a
24 changed files with 49 additions and 967 deletions

View File

@@ -58,14 +58,14 @@ public interface ReactiveMongoOperations {
*
* @return index operations on the named collection
*/
ReactiveIndexOperations reactiveIndexOps(String collectionName);
ReactiveIndexOperations indexOps(String collectionName);
/**
* Returns the reactive operations that can be performed on indexes
*
* @return index operations on the named collection associated with the given entity class
*/
ReactiveIndexOperations reactiveIndexOps(Class<?> entityClass);
ReactiveIndexOperations indexOps(Class<?> entityClass);
/**
* Execute the a MongoDB command expressed as a JSON string. This will call the method JSON.parse that is part of the

View File

@@ -209,7 +209,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
if (null != mappingContext && mappingContext instanceof MongoMappingContext) {
indexCreator = new MongoPersistentEntityIndexCreator((MongoMappingContext) mappingContext,
(collectionName) -> IndexOperationsAdapter.blocking(reactiveIndexOps(collectionName)));
(collectionName) -> IndexOperationsAdapter.blocking(indexOps(collectionName)));
eventPublisher = new MongoMappingEventPublisher(indexCreator);
if (mappingContext instanceof ApplicationEventPublisherAware) {
((ApplicationEventPublisherAware) mappingContext).setApplicationEventPublisher(eventPublisher);
@@ -316,14 +316,14 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#reactiveIndexOps(java.lang.String)
*/
public ReactiveIndexOperations reactiveIndexOps(String collectionName) {
public ReactiveIndexOperations indexOps(String collectionName) {
return new DefaultReactiveIndexOperations(this, collectionName);
}
/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.ReactiveMongoOperations#reactiveIndexOps(java.lang.Class)
*/
public ReactiveIndexOperations reactiveIndexOps(Class<?> entityClass) {
public ReactiveIndexOperations indexOps(Class<?> entityClass) {
return new DefaultReactiveIndexOperations(this, determineCollectionName(entityClass));
}

View File

@@ -21,8 +21,8 @@ import org.reactivestreams.Publisher;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.Sort;
import org.springframework.data.repository.NoRepositoryBean;
import org.springframework.data.repository.reactive.ReactivePagingAndSortingRepository;
import org.springframework.data.repository.reactive.ReactiveSortingRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -33,7 +33,7 @@ import reactor.core.publisher.Mono;
* @since 2.0
*/
@NoRepositoryBean
public interface ReactiveMongoRepository<T, ID extends Serializable> extends ReactivePagingAndSortingRepository<T, ID> {
public interface ReactiveMongoRepository<T, ID extends Serializable> extends ReactiveSortingRepository<T, ID> {
/**
* Inserts the given entity. Assumes the instance to be new to be able to apply insertion optimizations. Use

View File

@@ -25,11 +25,9 @@ import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.repository.query.ReactiveMongoQueryExecution.CollectionExecution;
import org.springframework.data.mongodb.repository.query.ReactiveMongoQueryExecution.DeleteExecution;
import org.springframework.data.mongodb.repository.query.ReactiveMongoQueryExecution.GeoNearExecution;
import org.springframework.data.mongodb.repository.query.ReactiveMongoQueryExecution.PagedExecution;
import org.springframework.data.mongodb.repository.query.ReactiveMongoQueryExecution.ResultProcessingConverter;
import org.springframework.data.mongodb.repository.query.ReactiveMongoQueryExecution.ResultProcessingExecution;
import org.springframework.data.mongodb.repository.query.ReactiveMongoQueryExecution.SingleEntityExecution;
import org.springframework.data.mongodb.repository.query.ReactiveMongoQueryExecution.SlicedExecution;
import org.springframework.data.mongodb.repository.query.ReactiveMongoQueryExecution.TailExecution;
import org.springframework.data.repository.query.ParameterAccessor;
import org.springframework.data.repository.query.RepositoryQuery;
@@ -136,14 +134,10 @@ public abstract class AbstractReactiveMongoQuery implements RepositoryQuery {
return new DeleteExecution(operations, method);
} else if (method.isGeoNearQuery()) {
return new GeoNearExecution(operations, accessor, method.getReturnType());
} else if (method.isSliceQuery()) {
return new SlicedExecution(operations, accessor.getPageable());
} else if (isInfiniteStream(method)) {
return new TailExecution(operations, accessor.getPageable());
} else if (method.isCollectionQuery()) {
return new CollectionExecution(operations, accessor.getPageable());
} else if (method.isPageQuery()) {
return new PagedExecution(operations, accessor.getPageable());
} else {
return new SingleEntityExecution(operations, isCountQuery());
}

View File

@@ -26,8 +26,6 @@ import org.springframework.data.geo.Point;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.repository.support.ReactivePageImpl;
import org.springframework.data.mongodb.repository.support.ReactiveSliceImpl;
import org.springframework.data.repository.query.ResultProcessor;
import org.springframework.data.repository.query.ReturnedType;
import org.springframework.data.repository.util.ReactiveWrappers;
@@ -87,61 +85,6 @@ interface ReactiveMongoQueryExecution {
}
}
/**
* {@link ReactiveMongoQueryExecution} for {@link Slice} query methods.
*
* @author Mark Paluch
*/
@RequiredArgsConstructor
final class SlicedExecution implements ReactiveMongoQueryExecution {
private final @NonNull ReactiveMongoOperations operations;
private final @NonNull Pageable pageable;
@Override
public Object execute(Query query, Class<?> type, String collection) {
int pageSize = pageable.getPageSize();
// Apply Pageable but tweak limit to peek into next page
Query modifiedQuery = query.with(pageable).limit(pageSize + 1);
Flux<?> flux = operations.find(modifiedQuery, type, collection);
return Mono.fromSupplier(() -> new ReactiveSliceImpl<>(flux, pageable));
}
}
/**
* {@link ReactiveMongoQueryExecution} for pagination queries.
*
* @author Mark Paluch
*/
@RequiredArgsConstructor
final class PagedExecution implements ReactiveMongoQueryExecution {
private final @NonNull ReactiveMongoOperations operations;
private final @NonNull Pageable pageable;
@Override
public Object execute(Query query, Class<?> type, String collection) {
int overallLimit = query.getLimit();
Mono<Long> count = operations.count(query, type, collection);
// Apply raw pagination
query = query.with(pageable);
// Adjust limit if page would exceed the overall limit
if (overallLimit != 0 && pageable.getOffset() + pageable.getPageSize() > overallLimit) {
query.limit(overallLimit - pageable.getOffset());
}
Flux<?> flux = operations.find(query, type, collection);
return Mono.fromSupplier(() -> new ReactivePageImpl<>(flux, pageable, count));
}
}
/**
* {@link ReactiveMongoQueryExecution} to return a single entity.
*

View File

@@ -19,6 +19,7 @@ import static org.springframework.data.repository.util.ClassUtils.*;
import java.lang.reflect.Method;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
@@ -34,6 +35,7 @@ import org.springframework.data.repository.util.ReactiveWrapperConverters;
import org.springframework.data.repository.util.ReactiveWrappers;
import org.springframework.data.util.ClassTypeInformation;
import org.springframework.data.util.TypeInformation;
import org.springframework.util.ClassUtils;
/**
* Reactive specific implementation of {@link MongoQueryMethod}.
@@ -71,6 +73,12 @@ public class ReactiveMongoQueryMethod extends MongoQueryMethod {
&& (PAGE_TYPE.isAssignableFrom(returnType.getComponentType())
|| SLICE_TYPE.isAssignableFrom(returnType.getComponentType()));
if (singleWrapperWithWrappedPageableResult) {
throw new InvalidDataAccessApiUsageException(
String.format("'%s.%s' must not use sliced or paged execution. Please use Flux.buffer(size, skip).",
ClassUtils.getShortName(method.getDeclaringClass()), method.getName()));
}
if (!multiWrapper && !singleWrapperWithWrappedPageableResult) {
throw new IllegalStateException(String.format(
"Method has to use a either multi-item reactive wrapper return type or a wrapped Page/Slice type. Offending method: %s",

View File

@@ -42,7 +42,7 @@ import org.springframework.data.repository.query.QueryLookupStrategy;
import org.springframework.data.repository.query.QueryLookupStrategy.Key;
import org.springframework.data.repository.query.RepositoryQuery;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.data.repository.reactive.RxJavaCrudRepository;
import org.springframework.data.repository.reactive.RxJava1CrudRepository;
import org.springframework.data.repository.util.QueryExecutionConverters;
import org.springframework.data.repository.util.ReactiveWrappers;
import org.springframework.expression.spel.standard.SpelExpressionParser;
@@ -92,7 +92,7 @@ public class MongoRepositoryFactory extends RepositoryFactorySupport {
boolean isReactiveRepository = (PROJECT_REACTOR_PRESENT && ReactiveCrudRepository.class.isAssignableFrom(metadata.getRepositoryInterface())) || (
RXJAVA_OBSERVABLE_PRESENT && RxJavaCrudRepository.class.isAssignableFrom(metadata.getRepositoryInterface()));
RXJAVA_OBSERVABLE_PRESENT && RxJava1CrudRepository.class.isAssignableFrom(metadata.getRepositoryInterface()));
boolean isQueryDslRepository = QUERY_DSL_PRESENT
&& QueryDslPredicateExecutor.class.isAssignableFrom(metadata.getRepositoryInterface());

View File

@@ -1,252 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.repository.support;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.springframework.core.convert.converter.Converter;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.MonoProcessor;
/**
* A reactive chunk of data restricted by the configured {@link Pageable}.
*
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.0
*/
//TODO: should that one move to SD Commons
abstract class ReactiveChunk<T> implements Slice<T>, Serializable {
private static final long serialVersionUID = 867755909294344406L;
private final Flux<T> content;
private final MonoProcessor<List<T>> processor;
private volatile List<T> contentCache;
private final Pageable pageable;
/**
* Creates a new {@link ReactiveChunk} with the given content and the given governing {@link Pageable}.
*
* @param content must not be {@literal null}.
* @param pageable can be {@literal null}.
*/
public ReactiveChunk(Flux<? extends T> content, Pageable pageable) {
Assert.notNull(content, "Content must not be null!");
this.content = (Flux) content;
this.pageable = pageable;
this.processor = this.content.collectList().doOnSuccess(list -> {
if (list.size() > pageable.getPageSize()) {
contentCache = list.subList(0, pageable.getPageSize());
} else {
contentCache = list;
}
}).subscribe();
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#getNumber()
*/
public int getNumber() {
return pageable == null ? 0 : pageable.getPageNumber();
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#getSize()
*/
public int getSize() {
return pageable == null ? 0 : pageable.getPageSize();
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#getNumberOfElements()
*/
public int getNumberOfElements() {
return getContent().size();
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#hasPrevious()
*/
public boolean hasPrevious() {
return getNumber() > 0;
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#isFirst()
*/
public boolean isFirst() {
return !hasPrevious();
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#isLast()
*/
public boolean isLast() {
return !hasNext();
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#nextPageable()
*/
public Pageable nextPageable() {
return hasNext() ? pageable.next() : null;
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#previousPageable()
*/
public Pageable previousPageable() {
if (hasPrevious()) {
return pageable.previousOrFirst();
}
return null;
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#hasContent()
*/
public boolean hasContent() {
return !getContent().isEmpty();
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#getContent()
*/
public List<T> getContent() {
if (contentCache != null) {
return Collections.unmodifiableList(contentCache);
}
List<T> list = processor.block();
if (list.size() > pageable.getPageSize()) {
return list.subList(0, pageable.getPageSize());
}
return Collections.unmodifiableList(list);
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#getSort()
*/
public Sort getSort() {
return pageable == null ? null : pageable.getSort();
}
/*
* (non-Javadoc)
* @see java.lang.Iterable#iterator()
*/
public Iterator<T> iterator() {
return getContent().iterator();
}
/**
* Applies the given {@link Converter} to the content of the {@link ReactiveChunk}.
*
* @param converter must not be {@literal null}.
* @return
*/
protected <S> List<S> getConvertedContent(Converter<? super T, ? extends S> converter) {
Assert.notNull(converter, "Converter must not be null!");
List<S> result = new ArrayList<S>(getContent().size());
for (T element : this) {
result.add(converter.convert(element));
}
return result;
}
/**
* Returns whether the returned list contains more elements than specified by {@link Pageable#getPageSize()}.
*
* @return
*/
protected boolean containsMore() {
List<T> list = processor.block();
return list.size() > pageable.getPageSize();
}
/*
* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof ReactiveChunk<?>)) {
return false;
}
ReactiveChunk<?> that = (ReactiveChunk<?>) obj;
boolean pageableEqual = this.pageable == null ? that.pageable == null : this.pageable.equals(that.pageable);
return pageableEqual;
}
/*
* (non-Javadoc)
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
int result = 17;
result += 31 * (pageable == null ? 0 : pageable.hashCode());
return result;
}
}

View File

@@ -86,7 +86,7 @@ public class ReactiveMongoRepositoryFactoryBean<T extends Repository<S, ID>, S,
if (createIndexesForQueryMethods) {
factory.addQueryCreationListener(
new IndexEnsuringQueryCreationListener(collectionName -> IndexOperationsAdapter.blocking(operations.reactiveIndexOps(collectionName))));
new IndexEnsuringQueryCreationListener(collectionName -> IndexOperationsAdapter.blocking(operations.indexOps(collectionName))));
}
return factory;

View File

@@ -1,170 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.repository.support;
import java.util.List;
import org.springframework.core.convert.converter.Converter;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
/**
* Reactive {@code Page} implementation.
*
* @param <T> the type of which the page consists.
* @author Mark Paluch
* @since 2.0
*/
//TODO: should that one move to SD-Commons?
public class ReactivePageImpl<T> extends ReactiveChunk<T> implements Page<T> {
private static final long serialVersionUID = 867755909294344406L;
private final MonoProcessor<Long> totalMono;
private volatile Long totalValueCache;
private final Pageable pageable;
/**
* Constructor of {@code PageImpl}.
*
* @param content the content of this page, must not be {@literal null}.
* @param pageable the paging information, can be {@literal null}.
* @param totalMono the total amount of items available. The total might be adapted considering the length of the
* content given, if it is going to be the content of the last page. This is in place to mitigate
* inconsistencies
*/
public ReactivePageImpl(Flux<? extends T> content, Pageable pageable, Mono<Long> totalMono) {
super(content, pageable);
this.pageable = pageable;
this.totalMono = totalMono.subscribe();
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Page#getTotalPages()
*/
@Override
public int getTotalPages() {
return getSize() == 0 ? 1 : (int) Math.ceil((double) getTotal0() / (double) getSize());
}
private long getTotal0() {
if (totalValueCache == null) {
long total = totalMono.block();
List<T> content = getContent();
this.totalValueCache = !content.isEmpty() && pageable != null
&& pageable.getOffset() + pageable.getPageSize() > total ? pageable.getOffset() + content.size() : total;
}
return totalValueCache;
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Page#getTotalElements()
*/
@Override
public long getTotalElements() {
return getTotal0();
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#hasNext()
*/
@Override
public boolean hasNext() {
return getNumber() + 1 < getTotalPages();
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#isLast()
*/
@Override
public boolean isLast() {
return !hasNext();
}
/*
* (non-Javadoc)
* @see org.springframework.data.domain.Slice#transform(org.springframework.core.convert.converter.Converter)
*/
@Override
public <S> Page<S> map(Converter<? super T, ? extends S> converter) {
return new ReactivePageImpl<S>(Flux.fromIterable(getConvertedContent(converter)), pageable, Mono.just(getTotal0()));
}
/*
* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
String contentType = "UNKNOWN";
List<T> content = getContent();
if (content.size() > 0) {
contentType = content.get(0).getClass().getName();
}
return String.format("Page %s of %d containing %s instances", getNumber() + 1, getTotalPages(), contentType);
}
/*
* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof ReactivePageImpl<?>)) {
return false;
}
ReactivePageImpl<?> that = (ReactivePageImpl<?>) obj;
return getTotal0() == that.getTotal0() && super.equals(obj);
}
/*
* (non-Javadoc)
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
int result = 17;
result += 31 * (int) (getTotal0() ^ getTotal0() >>> 32);
result += 31 * super.hashCode();
return result;
}
}

View File

@@ -1,66 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.repository.support;
import java.util.List;
import org.springframework.core.convert.converter.Converter;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.SliceImpl;
import reactor.core.publisher.Flux;
/**
* Reactive {@code Page} implementation.
*
* @param <T> the type of which the page consists.
* @author Mark Paluch
* @since 2.0
*/
public class ReactiveSliceImpl<T> extends ReactiveChunk<T> {
private static final long serialVersionUID = 867755909294344406L;
private final Pageable pageable;
public ReactiveSliceImpl(Flux<T> content, Pageable pageable) {
super(content, pageable);
this.pageable = pageable;
}
public boolean hasNext() {
return containsMore();
}
public <S> Slice<S> map(Converter<? super T, ? extends S> converter) {
return new SliceImpl<>(this.getConvertedContent(converter), pageable, this.hasNext());
}
public String toString() {
String contentType = "UNKNOWN";
List content = this.getContent();
if (content.size() > 0) {
contentType = content.get(0).getClass().getName();
}
return String.format("Slice %d containing %s instances",
this.getNumber(), contentType);
}
}

View File

@@ -142,17 +142,6 @@ public class SimpleReactiveMongoRepository<T, ID extends Serializable> implement
return Flux.from(idStream).buffer().flatMap(this::findAll);
}
@Override
public Mono<Page<T>> findAll(Pageable pageable) {
Assert.notNull(pageable, "The given Pageable must not be null!");
Mono<Long> count = count();
Flux<T> content = findAll(new Query().with(pageable));
return Mono.fromCallable(() -> new ReactivePageImpl<>(content, pageable, count));
}
@Override
public Flux<T> findAll(Sort sort) {
return findAll(new Query().with(sort));

View File

@@ -87,7 +87,7 @@ public class ReactiveMongoTemplateIndexTests {
p2.setAge(40);
template.insert(p2);
template.reactiveIndexOps(Person.class).ensureIndex(new Index().on("age", Direction.DESC).unique()).block();
template.indexOps(Person.class).ensureIndex(new Index().on("age", Direction.DESC).unique()).block();
MongoCollection<Document> coll = template.getCollection(template.getCollectionName(Person.class));
List<Document> indexInfo = Flux.from(coll.listIndexes()).collectList().block();
@@ -116,9 +116,9 @@ public class ReactiveMongoTemplateIndexTests {
p1.setAge(25);
template.insert(p1).block();
template.reactiveIndexOps(Person.class).ensureIndex(new Index().on("age", Direction.DESC).unique()).block();
template.indexOps(Person.class).ensureIndex(new Index().on("age", Direction.DESC).unique()).block();
List<IndexInfo> indexInfoList = Flux.from(template.reactiveIndexOps(Person.class).getIndexInfo()).collectList()
List<IndexInfo> indexInfoList = Flux.from(template.indexOps(Person.class).getIndexInfo()).collectList()
.block();
assertThat(indexInfoList.size(), is(2));
@@ -140,10 +140,10 @@ public class ReactiveMongoTemplateIndexTests {
String command = "db." + template.getCollectionName(Person.class)
+ ".createIndex({'age':-1}, {'unique':true, 'sparse':true}), 1";
template.reactiveIndexOps(Person.class).dropAllIndexes().block();
template.indexOps(Person.class).dropAllIndexes().block();
TestSubscriber<IndexInfo> subscriber = TestSubscriber
.subscribe(template.reactiveIndexOps(Person.class).getIndexInfo());
.subscribe(template.indexOps(Person.class).getIndexInfo());
subscriber.await().assertComplete().assertNoValues();
Mono.from(factory.getMongoDatabase().runCommand(new org.bson.Document("eval", command))).block();
@@ -165,7 +165,7 @@ public class ReactiveMongoTemplateIndexTests {
assertThat(indexKey, hasEntry("age", -1D));
assertThat(unique, is(true));
List<IndexInfo> indexInfos = template.reactiveIndexOps(Person.class).getIndexInfo().collectList().block();
List<IndexInfo> indexInfos = template.indexOps(Person.class).getIndexInfo().collectList().block();
IndexInfo info = indexInfos.get(1);
assertThat(info.isUnique(), is(true));

View File

@@ -719,7 +719,7 @@ public class ReactiveMongoTemplateTests {
new Venue("Maplewood, NJ", -74.2713, 40.73137));
template.insertAll(venues).blockLast();
IndexOperationsAdapter.blocking(template.reactiveIndexOps(Venue.class))
IndexOperationsAdapter.blocking(template.indexOps(Venue.class))
.ensureIndex(new GeospatialIndex("location").typed(GeoSpatialIndexType.GEO_2D));
NearQuery geoFar = NearQuery.near(-73, 40, Metrics.KILOMETERS).num(10).maxDistance(150, Metrics.KILOMETERS);
@@ -828,7 +828,7 @@ public class ReactiveMongoTemplateTests {
ReactiveMongoTemplate template = new ReactiveMongoTemplate(factory);
template.setWriteResultChecking(WriteResultChecking.EXCEPTION);
template.reactiveIndexOps(Person.class).ensureIndex(new Index().on("firstName", Direction.DESC).unique()).block();
template.indexOps(Person.class).ensureIndex(new Index().on("firstName", Direction.DESC).unique()).block();
Person person = new Person(new ObjectId(), "Amol");
person.setAge(28);

View File

@@ -36,8 +36,8 @@ import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
import org.springframework.data.repository.RepositoryDefinition;
import org.springframework.data.repository.reactive.ReactivePagingAndSortingRepository;
import org.springframework.data.repository.reactive.RxJavaPagingAndSortingRepository;
import org.springframework.data.repository.reactive.ReactiveSortingRepository;
import org.springframework.data.repository.reactive.RxJava1SortingRepository;
import org.springframework.stereotype.Repository;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@@ -236,13 +236,13 @@ public class ConvertingReactiveMongoRepositoryTests {
}
@Repository
interface ReactivePersonRepostitory extends ReactivePagingAndSortingRepository<ReactivePerson, String> {
interface ReactivePersonRepostitory extends ReactiveSortingRepository<ReactivePerson, String> {
Publisher<ReactivePerson> findByLastname(String lastname);
}
@Repository
interface RxJavaPersonRepostitory extends RxJavaPagingAndSortingRepository<ReactivePerson, String> {
interface RxJavaPersonRepostitory extends RxJava1SortingRepository<ReactivePerson, String> {
Observable<ReactivePerson> findByFirstnameAndLastname(String firstname, String lastname);

View File

@@ -135,48 +135,6 @@ public class ReactiveMongoRepositoryTests implements BeanClassLoaderAware, BeanF
assertThat(list, hasSize(2));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void shouldFindMonoOfPage() {
Mono<Page<Person>> pageMono = repository.findMonoPageByLastname("Matthews", new PageRequest(0, 1));
Page<Person> persons = pageMono.block();
assertThat(persons.getContent(), hasSize(1));
assertThat(persons.getTotalPages(), is(2));
pageMono = repository.findMonoPageByLastname("Matthews", new PageRequest(0, 100));
persons = pageMono.block();
assertThat(persons.getContent(), hasSize(2));
assertThat(persons.getTotalPages(), is(1));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void shouldFindMonoOfSlice() {
Mono<Slice<Person>> pageMono = repository.findMonoSliceByLastname("Matthews", new PageRequest(0, 1));
Slice<Person> persons = pageMono.block();
assertThat(persons.getContent(), hasSize(1));
assertThat(persons.hasNext(), is(true));
pageMono = repository.findMonoSliceByLastname("Matthews", new PageRequest(0, 100));
persons = pageMono.block();
assertThat(persons.getContent(), hasSize(2));
assertThat(persons.hasNext(), is(false));
}
/**
* @see DATAMONGO-1444
*/
@@ -388,10 +346,6 @@ public class ReactiveMongoRepositoryTests implements BeanClassLoaderAware, BeanF
Mono<Person> findOneByLastname(String lastname);
Mono<Page<Person>> findMonoPageByLastname(String lastname, Pageable pageRequest);
Mono<Slice<Person>> findMonoSliceByLastname(String lastname, Pageable pageRequest);
Mono<Person> findByLastname(Publisher<String> lastname);
Flux<Person> findByLastnameIn(Publisher<String> lastname);

View File

@@ -249,36 +249,6 @@ public class SimpleReactiveMongoRepositoryTests implements BeanClassLoaderAware,
assertThat(persons.get(0).getId(), is(equalTo(oliver.getId())));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void findAllWithPageRequestShouldReturnPage() {
Page<ReactivePerson> people = repository.findAll(new PageRequest(0, 10)).block();
assertThat(people.getTotalPages(), is(1));
List<String> ids = people.getContent().stream().map(ReactivePerson::getId).collect(Collectors.toList());
assertThat(ids, hasSize(7));
assertThat(ids, hasItems(dave.id, carter.id));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void findAllWithPageRequestOfPageSize1ShouldReturnPage() {
Page<ReactivePerson> people = repository.findAll(new PageRequest(1, 1)).block();
List<String> ids = people.getContent().stream().map(ReactivePerson::getId).collect(Collectors.toList());
assertThat(people.getTotalPages(), is(7));
assertThat(ids, hasSize(1));
}
/**
* @see DATAMONGO-1444
*/

View File

@@ -34,7 +34,7 @@ import org.springframework.data.repository.config.AnnotationRepositoryConfigurat
import org.springframework.data.repository.config.RepositoryConfiguration;
import org.springframework.data.repository.config.RepositoryConfigurationSource;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import org.springframework.data.repository.reactive.RxJavaCrudRepository;
import org.springframework.data.repository.reactive.RxJava1CrudRepository;
/**
* Unit tests for {@link ReactiveMongoRepositoryConfigurationExtension}.
@@ -115,7 +115,7 @@ public class ReactiveMongoRepositoryConfigurationExtensionUnitTests {
interface SampleRepository extends ReactiveCrudRepository<Sample, Long> {}
interface UnannotatedRepository extends RxJavaCrudRepository<Store, Long> {}
interface UnannotatedRepository extends RxJava1CrudRepository<Store, Long> {}
interface StoreRepository extends ReactiveMongoRepository<Store, Long> {}
}

View File

@@ -17,12 +17,12 @@
package org.springframework.data.mongodb.repository.custom;
import org.springframework.data.mongodb.repository.User;
import org.springframework.data.repository.reactive.RxJavaCrudRepository;
import org.springframework.data.repository.reactive.RxJava1CrudRepository;
/**
* @author Mark Paluch
*/
public interface CustomReactiveMongoRepository
extends RxJavaCrudRepository<User, String>, CustomReactiveMongoRepositoryCustom {
extends RxJava1CrudRepository<User, String>, CustomReactiveMongoRepositoryCustom {
}

View File

@@ -37,8 +37,6 @@ import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.repository.Person;
import org.springframework.data.mongodb.repository.query.ReactiveMongoQueryExecution.GeoNearExecution;
import org.springframework.data.mongodb.repository.query.ReactiveMongoQueryExecution.PagedExecution;
import org.springframework.data.mongodb.repository.query.ReactiveMongoQueryExecution.SlicedExecution;
import org.springframework.data.util.ClassTypeInformation;
import org.springframework.util.ClassUtils;
@@ -55,38 +53,6 @@ public class ReactiveMongoQueryExecutionUnitTests {
@Mock private ReactiveMongoOperations operations;
@Mock private MongoParameterAccessor parameterAccessor;
/**
* @see DATAMONGO-1444
*/
@Test
public void slicedExecutionShouldApplyQuerySettings() throws Exception {
Query query = new Query();
new SlicedExecution(operations, new PageRequest(1, 10)).execute(query, Person.class, "person");
assertThat(query.getLimit(), is(equalTo(11)));
assertThat(query.getSkip(), is(equalTo(10)));
verify(operations).find(query, Person.class, "person");
}
/**
* @see DATAMONGO-1444
*/
@Test
public void pagedExecutionShouldApplyQuerySettings() throws Exception {
Query query = new Query();
new PagedExecution(operations, new PageRequest(1, 10)).execute(query, Person.class, "person");
assertThat(query.getLimit(), is(equalTo(10)));
assertThat(query.getSkip(), is(equalTo(10)));
verify(operations).find(query, Person.class, "person");
verify(operations).count(query, Person.class, "person");
}
/**
* @see DATAMONGO-1444
*/

View File

@@ -23,6 +23,7 @@ import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
@@ -162,27 +163,17 @@ public class ReactiveMongoQueryMethodUnitTests {
/**
* @see DATAMONGO-1444
*/
@Test
public void acceptsPageableMethodsUsingWrappedPage() throws Exception {
MongoQueryMethod method = queryMethod(PersonRepository.class, "findMonoPageByLastname", String.class,
Pageable.class);
assertThat(method.isPageQuery(), is(true));
assertThat(method.isSliceQuery(), is(false));
@Test(expected = InvalidDataAccessApiUsageException.class)
public void throwsExceptionOnWrappedPage() throws Exception {
queryMethod(PersonRepository.class, "findMonoPageByLastname", String.class, Pageable.class);
}
/**
* @see DATAMONGO-1444
*/
@Test
public void acceptsPageableMethodsUsingWrappedSlice() throws Exception {
MongoQueryMethod method = queryMethod(PersonRepository.class, "findMonoSliceByLastname", String.class,
Pageable.class);
assertThat(method.isPageQuery(), is(false));
assertThat(method.isSliceQuery(), is(true));
@Test(expected = InvalidDataAccessApiUsageException.class)
public void throwsExceptionOnWrappedSlice() throws Exception {
queryMethod(PersonRepository.class, "findMonoSliceByLastname", String.class, Pageable.class);
}
/**

View File

@@ -1,150 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.repository.support;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import org.junit.Test;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Unit tests for {@link ReactivePageImpl}.
*
* @author Mark Paluch
*/
public class ReactivePageImplUnitTests {
/**
* @see DATAMONGO-1444
*/
@Test(expected = IllegalArgumentException.class)
public void preventsNullContentForAdvancedSetup() throws Exception {
new ReactivePageImpl<Object>(null, null, Mono.just(0L));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void returnsNextPageable() {
Page<Object> page = new ReactivePageImpl<>(Flux.just(new Object()), new PageRequest(0, 1), Mono.just(10L));
assertThat(page.isFirst(), is(true));
assertThat(page.hasPrevious(), is(false));
assertThat(page.previousPageable(), is(nullValue()));
assertThat(page.isLast(), is(false));
assertThat(page.hasNext(), is(true));
assertThat(page.nextPageable(), is(new PageRequest(1, 1)));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void returnsContentBoundedByPageSize() {
Page<Object> page = new ReactivePageImpl<>(Flux.just(new Object(), new Object()), new PageRequest(0, 1),
Mono.just(10L));
assertThat(page.getContent(), hasSize(1));
assertThat(page.hasNext(), is(true));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void returnsPreviousPageable() {
Page<Object> page = new ReactivePageImpl<>(Flux.just(new Object()), new PageRequest(1, 1), Mono.just(2L));
assertThat(page.isFirst(), is(false));
assertThat(page.hasPrevious(), is(true));
assertThat(page.previousPageable(), is(new PageRequest(0, 1)));
assertThat(page.isLast(), is(true));
assertThat(page.hasNext(), is(false));
assertThat(page.nextPageable(), is(nullValue()));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void transformsPageCorrectly() {
Page<Integer> transformed = new ReactivePageImpl<>(Flux.just("foo", "bar"), new PageRequest(0, 2), Mono.just(10L))
.map(String::length);
assertThat(transformed.getContent(), hasSize(2));
assertThat(transformed.getContent(), contains(3, 3));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void adaptsTotalForLastPageOnIntermediateDeletion() {
assertThat(new ReactivePageImpl<>(Flux.just("foo", "bar"), new PageRequest(0, 5), Mono.just(3L)).getTotalElements(),
is(2L));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void adaptsTotalForLastPageOnIntermediateInsertion() {
assertThat(new ReactivePageImpl<>(Flux.just("foo", "bar"), new PageRequest(0, 5), Mono.just(1L)).getTotalElements(),
is(2L));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void adaptsTotalForLastPageOnIntermediateDeletionOnLastPate() {
assertThat(
new ReactivePageImpl<>(Flux.just("foo", "bar"), new PageRequest(1, 10), Mono.just(13L)).getTotalElements(),
is(12L));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void adaptsTotalForLastPageOnIntermediateInsertionOnLastPate() {
assertThat(
new ReactivePageImpl<>(Flux.just("foo", "bar"), new PageRequest(1, 10), Mono.just(11L)).getTotalElements(),
is(12L));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void doesNotAdapttotalIfPageIsEmpty() {
assertThat(new ReactivePageImpl<String>(Flux.empty(), new PageRequest(1, 10), Mono.just(0L)).getTotalElements(),
is(0L));
}
}

View File

@@ -1,88 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.repository.support;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import org.junit.Test;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Slice;
import reactor.core.publisher.Flux;
/**
* Unit tests for {@link ReactiveSliceImpl}.
*
* @author Mark Paluch
*/
public class ReactiveSliceImplUnitTests {
/**
* @see DATAMONGO-1444
*/
@Test(expected = IllegalArgumentException.class)
public void preventsNullContentForAdvancedSetup() throws Exception {
new ReactiveSliceImpl<Object>(null, null);
}
/**
* @see DATAMONGO-1444
*/
@Test
public void returnsNextPageable() {
Slice<Object> page = new ReactiveSliceImpl<>(Flux.just(new Object(), new Object()), new PageRequest(0, 1));
assertThat(page.isFirst(), is(true));
assertThat(page.hasPrevious(), is(false));
assertThat(page.previousPageable(), is(nullValue()));
assertThat(page.isLast(), is(false));
assertThat(page.hasNext(), is(true));
assertThat(page.nextPageable(), is(new PageRequest(1, 1)));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void returnsPreviousPageable() {
Slice<Object> page = new ReactiveSliceImpl<>(Flux.just(new Object()), new PageRequest(1, 1));
assertThat(page.isFirst(), is(false));
assertThat(page.hasPrevious(), is(true));
assertThat(page.previousPageable(), is(new PageRequest(0, 1)));
assertThat(page.isLast(), is(true));
assertThat(page.hasNext(), is(false));
assertThat(page.nextPageable(), is(nullValue()));
}
/**
* @see DATAMONGO-1444
*/
@Test
public void transformsPageCorrectly() {
Slice<Integer> transformed = new ReactiveSliceImpl<>(Flux.just("foo", "bar"), new PageRequest(0, 2))
.map(String::length);
assertThat(transformed.getContent(), hasSize(2));
assertThat(transformed.getContent(), contains(3, 3));
}
}

View File

@@ -16,9 +16,9 @@ Spring Data MongoDB is built on top of the https://mongodb.github.io/mongo-java-
Spring Data's Repository abstraction is a dynamic API, mostly defined by you and your requirements, as you're declaring query methods. Reactive MongoDB repositories can be either implemented using RxJava or Project Reactor wrapper types by simply extending from one of the library-specific repository interfaces:
* `ReactiveCrudRepository`
* `ReactivePagingAndSortingRepository`
* `RxJavaCrudRepository`
* `RxJavaPagingAndSortingRepository`
* `ReactiveSortingRepository`
* `RxJava1CrudRepository`
* `RxJava1SortingRepository`
Spring Data converts reactive wrapper types behind the scenes so that you can stick to your favorite composition library.
@@ -50,7 +50,7 @@ We have a quite simple domain object here. Note that it has a property named `id
====
[source]
----
public interface ReactivePersonRepository extends ReactivePagingAndSortingRepository<Person, Long> {
public interface ReactivePersonRepository extends ReactiveSortingRepository<Person, Long> {
Flux<Person> findByFirstname(String firstname);
@@ -93,9 +93,9 @@ class ApplicationConfig extends AbstractReactiveMongoConfiguration {
----
====
As our domain repository extends `ReactivePagingAndSortingRepository` it provides you with CRUD operations as well as methods for paginated and sorted access to the entities. Working with the repository instance is just a matter of dependency injecting it into a client. So accessing the second page of `Person` s at a page size of 10 would simply look something like this:
As our domain repository extends `ReactiveSortingRepository` it provides you with CRUD operations as well as methods for sorted access to the entities. Working with the repository instance is just a matter of dependency injecting it into a client.
.Paging access to Person entities
.Sorted access to Person entities
====
[source,java]
----
@@ -104,20 +104,13 @@ public class PersonRepositoryTests {
@Autowired ReactivePersonRepository repository;
@Test
public void readsFirstPageCorrectly() {
Mono<Page<Person>> persons = repository.findAll(new PageRequest(0, 10));
}
@Test
public void readsFirstPageAsStream() {
Flux<Person> persons = repository.findAll(new PageRequest(0, 10));
public void sortsElementsCorrectly() {
Flux<Person> persons = repository.findAll(new Sort(new Order(ASC, "lastname")));
}
}
----
====
The sample creates an application context with Spring's unit test support which will perform annotation based dependency injection into test cases. Inside the test method we simply use the repository to query the datastore. We hand the repository a `PageRequest` instance that requests the first page of persons at a page size of 10.
[[mongo.reactive.repositories.features]]
== Features