@@ -663,7 +663,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<MongoCollection<Document>> createView(String name, Class<?> source, AggregationPipeline pipeline, @Nullable ViewOptions options) {
|
||||
public Mono<MongoCollection<Document>> createView(String name, Class<?> source, AggregationPipeline pipeline,
|
||||
@Nullable ViewOptions options) {
|
||||
|
||||
return createView(name, getCollectionName(source),
|
||||
queryOperations.createAggregation(Aggregation.newAggregation(source, pipeline.getOperations()), source),
|
||||
@@ -1894,10 +1895,12 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
publisher = filter.isEmpty() ? db.watch(Document.class) : db.watch(filter, Document.class);
|
||||
}
|
||||
|
||||
if(options.isResumeAfter()) {
|
||||
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher);
|
||||
if (options.isResumeAfter()) {
|
||||
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter)
|
||||
.orElse(publisher);
|
||||
} else if (options.isStartAfter()) {
|
||||
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::startAfter).orElse(publisher);
|
||||
publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::startAfter)
|
||||
.orElse(publisher);
|
||||
}
|
||||
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
|
||||
.orElse(publisher);
|
||||
|
||||
@@ -1500,7 +1500,9 @@ public class ReactiveMongoTemplateUnitTests {
|
||||
when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher);
|
||||
|
||||
BsonDocument token = new BsonDocument("token", new BsonString("id"));
|
||||
template.changeStream("database", "collection", ChangeStreamOptions.builder().startAfter(token).build(), Object.class).subscribe();
|
||||
template
|
||||
.changeStream("database", "collection", ChangeStreamOptions.builder().startAfter(token).build(), Object.class)
|
||||
.subscribe();
|
||||
|
||||
verify(changeStreamPublisher).startAfter(eq(token));
|
||||
}
|
||||
@@ -1530,6 +1532,7 @@ public class ReactiveMongoTemplateUnitTests {
|
||||
|
||||
AutogenerateableId foo;
|
||||
}
|
||||
|
||||
static class PersonExtended extends Person {
|
||||
|
||||
String lastname;
|
||||
|
||||
Reference in New Issue
Block a user