Add option to configure change stream behaviour at collection creation time.
Introduce CollectionChangeStreamOptions which allows to define the changeStreamPreAndPostImages of the createCollection command. Original Pull Request: #4193
This commit is contained in:
@@ -46,10 +46,11 @@ public class CollectionOptions {
|
|||||||
private @Nullable Collation collation;
|
private @Nullable Collation collation;
|
||||||
private ValidationOptions validationOptions;
|
private ValidationOptions validationOptions;
|
||||||
private @Nullable TimeSeriesOptions timeSeriesOptions;
|
private @Nullable TimeSeriesOptions timeSeriesOptions;
|
||||||
|
private @Nullable CollectionChangeStreamOptions changeStreamOptions;
|
||||||
|
|
||||||
private CollectionOptions(@Nullable Long size, @Nullable Long maxDocuments, @Nullable Boolean capped,
|
private CollectionOptions(@Nullable Long size, @Nullable Long maxDocuments, @Nullable Boolean capped,
|
||||||
@Nullable Collation collation, ValidationOptions validationOptions,
|
@Nullable Collation collation, ValidationOptions validationOptions,
|
||||||
@Nullable TimeSeriesOptions timeSeriesOptions) {
|
@Nullable TimeSeriesOptions timeSeriesOptions, @Nullable CollectionChangeStreamOptions changeStreamOptions) {
|
||||||
|
|
||||||
this.maxDocuments = maxDocuments;
|
this.maxDocuments = maxDocuments;
|
||||||
this.size = size;
|
this.size = size;
|
||||||
@@ -57,6 +58,7 @@ public class CollectionOptions {
|
|||||||
this.collation = collation;
|
this.collation = collation;
|
||||||
this.validationOptions = validationOptions;
|
this.validationOptions = validationOptions;
|
||||||
this.timeSeriesOptions = timeSeriesOptions;
|
this.timeSeriesOptions = timeSeriesOptions;
|
||||||
|
this.changeStreamOptions = changeStreamOptions;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -70,7 +72,7 @@ public class CollectionOptions {
|
|||||||
|
|
||||||
Assert.notNull(collation, "Collation must not be null");
|
Assert.notNull(collation, "Collation must not be null");
|
||||||
|
|
||||||
return new CollectionOptions(null, null, null, collation, ValidationOptions.none(), null);
|
return new CollectionOptions(null, null, null, collation, ValidationOptions.none(), null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -80,7 +82,7 @@ public class CollectionOptions {
|
|||||||
* @since 2.0
|
* @since 2.0
|
||||||
*/
|
*/
|
||||||
public static CollectionOptions empty() {
|
public static CollectionOptions empty() {
|
||||||
return new CollectionOptions(null, null, null, null, ValidationOptions.none(), null);
|
return new CollectionOptions(null, null, null, null, ValidationOptions.none(), null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -97,6 +99,18 @@ public class CollectionOptions {
|
|||||||
return empty().timeSeries(TimeSeriesOptions.timeSeries(timeField));
|
return empty().timeSeries(TimeSeriesOptions.timeSeries(timeField));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Quick way to set up {@link CollectionOptions} for emitting (pre & post) change events.
|
||||||
|
*
|
||||||
|
* @return new instance of {@link CollectionOptions}.
|
||||||
|
* @see #changeStream(CollectionChangeStreamOptions)
|
||||||
|
* @see CollectionChangeStreamOptions#preAndPostImages(boolean)
|
||||||
|
* @since 4.0
|
||||||
|
*/
|
||||||
|
public static CollectionOptions emitChangedRevisions() {
|
||||||
|
return empty().changeStream(CollectionChangeStreamOptions.preAndPostImages(true));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create new {@link CollectionOptions} with already given settings and capped set to {@literal true}. <br />
|
* Create new {@link CollectionOptions} with already given settings and capped set to {@literal true}. <br />
|
||||||
* <strong>NOTE</strong> Using capped collections requires defining {@link #size(long)}.
|
* <strong>NOTE</strong> Using capped collections requires defining {@link #size(long)}.
|
||||||
@@ -105,7 +119,7 @@ public class CollectionOptions {
|
|||||||
* @since 2.0
|
* @since 2.0
|
||||||
*/
|
*/
|
||||||
public CollectionOptions capped() {
|
public CollectionOptions capped() {
|
||||||
return new CollectionOptions(size, maxDocuments, true, collation, validationOptions, null);
|
return new CollectionOptions(size, maxDocuments, true, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -116,7 +130,7 @@ public class CollectionOptions {
|
|||||||
* @since 2.0
|
* @since 2.0
|
||||||
*/
|
*/
|
||||||
public CollectionOptions maxDocuments(long maxDocuments) {
|
public CollectionOptions maxDocuments(long maxDocuments) {
|
||||||
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions);
|
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -127,7 +141,7 @@ public class CollectionOptions {
|
|||||||
* @since 2.0
|
* @since 2.0
|
||||||
*/
|
*/
|
||||||
public CollectionOptions size(long size) {
|
public CollectionOptions size(long size) {
|
||||||
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions);
|
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -138,7 +152,7 @@ public class CollectionOptions {
|
|||||||
* @since 2.0
|
* @since 2.0
|
||||||
*/
|
*/
|
||||||
public CollectionOptions collation(@Nullable Collation collation) {
|
public CollectionOptions collation(@Nullable Collation collation) {
|
||||||
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions);
|
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -258,7 +272,7 @@ public class CollectionOptions {
|
|||||||
public CollectionOptions validation(ValidationOptions validationOptions) {
|
public CollectionOptions validation(ValidationOptions validationOptions) {
|
||||||
|
|
||||||
Assert.notNull(validationOptions, "ValidationOptions must not be null");
|
Assert.notNull(validationOptions, "ValidationOptions must not be null");
|
||||||
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions);
|
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -271,7 +285,20 @@ public class CollectionOptions {
|
|||||||
public CollectionOptions timeSeries(TimeSeriesOptions timeSeriesOptions) {
|
public CollectionOptions timeSeries(TimeSeriesOptions timeSeriesOptions) {
|
||||||
|
|
||||||
Assert.notNull(timeSeriesOptions, "TimeSeriesOptions must not be null");
|
Assert.notNull(timeSeriesOptions, "TimeSeriesOptions must not be null");
|
||||||
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions);
|
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create new {@link CollectionOptions} with the given {@link TimeSeriesOptions}.
|
||||||
|
*
|
||||||
|
* @param changeStreamOptions must not be {@literal null}.
|
||||||
|
* @return new instance of {@link CollectionOptions}.
|
||||||
|
* @since 3.3
|
||||||
|
*/
|
||||||
|
public CollectionOptions changeStream(CollectionChangeStreamOptions changeStreamOptions) {
|
||||||
|
|
||||||
|
Assert.notNull(changeStreamOptions, "ChangeStreamOptions must not be null");
|
||||||
|
return new CollectionOptions(size, maxDocuments, capped, collation, validationOptions, timeSeriesOptions, changeStreamOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -332,6 +359,16 @@ public class CollectionOptions {
|
|||||||
return Optional.ofNullable(timeSeriesOptions);
|
return Optional.ofNullable(timeSeriesOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the {@link CollectionChangeStreamOptions} if available.
|
||||||
|
*
|
||||||
|
* @return {@link Optional#empty()} if not specified.
|
||||||
|
* @since 4.0
|
||||||
|
*/
|
||||||
|
public Optional<CollectionChangeStreamOptions> getChangeStreamOptions() {
|
||||||
|
return Optional.ofNullable(changeStreamOptions);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encapsulation of ValidationOptions options.
|
* Encapsulation of ValidationOptions options.
|
||||||
*
|
*
|
||||||
@@ -428,6 +465,34 @@ public class CollectionOptions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulation of options applied to define collections change stream behaviour.
|
||||||
|
*
|
||||||
|
* @author Christoph Strobl
|
||||||
|
* @since 4.0
|
||||||
|
*/
|
||||||
|
public static class CollectionChangeStreamOptions {
|
||||||
|
|
||||||
|
private final boolean preAndPostImages;
|
||||||
|
|
||||||
|
private CollectionChangeStreamOptions(boolean emitChangedRevisions) {
|
||||||
|
this.preAndPostImages = emitChangedRevisions;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Output the version of a document before and after changes (the document pre- and post-images).
|
||||||
|
*
|
||||||
|
* @return new instance of {@link CollectionChangeStreamOptions}.
|
||||||
|
*/
|
||||||
|
public static CollectionChangeStreamOptions preAndPostImages(boolean emitChangedRevisions) {
|
||||||
|
return new CollectionChangeStreamOptions(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getPreAndPostImages() {
|
||||||
|
return preAndPostImages;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Options applicable to Time Series collections.
|
* Options applicable to Time Series collections.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ import java.util.Map;
|
|||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.bson.Document;
|
import org.bson.Document;
|
||||||
|
|
||||||
import org.springframework.core.convert.ConversionService;
|
import org.springframework.core.convert.ConversionService;
|
||||||
import org.springframework.dao.InvalidDataAccessApiUsageException;
|
import org.springframework.dao.InvalidDataAccessApiUsageException;
|
||||||
import org.springframework.data.convert.CustomConversions;
|
import org.springframework.data.convert.CustomConversions;
|
||||||
@@ -57,6 +56,7 @@ import org.springframework.util.MultiValueMap;
|
|||||||
import org.springframework.util.ObjectUtils;
|
import org.springframework.util.ObjectUtils;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
|
|
||||||
|
import com.mongodb.client.model.ChangeStreamPreAndPostImagesOptions;
|
||||||
import com.mongodb.client.model.CreateCollectionOptions;
|
import com.mongodb.client.model.CreateCollectionOptions;
|
||||||
import com.mongodb.client.model.TimeSeriesGranularity;
|
import com.mongodb.client.model.TimeSeriesGranularity;
|
||||||
import com.mongodb.client.model.ValidationOptions;
|
import com.mongodb.client.model.ValidationOptions;
|
||||||
@@ -341,6 +341,9 @@ class EntityOperations {
|
|||||||
result.timeSeriesOptions(options);
|
result.timeSeriesOptions(options);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
collectionOptions.getChangeStreamOptions().ifPresent(it -> result
|
||||||
|
.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(it.getPreAndPostImages())));
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2511,6 +2511,11 @@ public class MongoTemplate implements MongoOperations, ApplicationContextAware,
|
|||||||
doc.put("timeseries", timeseries);
|
doc.put("timeseries", timeseries);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
collectionOptions.getChangeStreamOptions().map(it -> new Document("enabled", it.getPreAndPostImages()))
|
||||||
|
.ifPresent(it -> {
|
||||||
|
doc.put("changeStreamPreAndPostImages", it);
|
||||||
|
});
|
||||||
|
|
||||||
return doc;
|
return doc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ import org.junit.jupiter.api.Test;
|
|||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.springframework.data.annotation.Id;
|
import org.springframework.data.annotation.Id;
|
||||||
import org.springframework.data.mongodb.core.ChangeStreamOptions;
|
import org.springframework.data.mongodb.core.ChangeStreamOptions;
|
||||||
|
import org.springframework.data.mongodb.core.CollectionOptions;
|
||||||
import org.springframework.data.mongodb.core.mapping.Field;
|
import org.springframework.data.mongodb.core.mapping.Field;
|
||||||
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
|
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
|
||||||
import org.springframework.data.mongodb.core.messaging.ChangeStreamTask.ChangeStreamEventMessage;
|
import org.springframework.data.mongodb.core.messaging.ChangeStreamTask.ChangeStreamEventMessage;
|
||||||
@@ -724,9 +725,7 @@ class ChangeStreamTests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void createUserCollectionWithChangeStreamPreAndPostImagesEnabled() {
|
private void createUserCollectionWithChangeStreamPreAndPostImagesEnabled() {
|
||||||
CreateCollectionOptions createCollectionOptions = new CreateCollectionOptions();
|
template.createCollection(User.class, CollectionOptions.emitChangedRevisions());
|
||||||
createCollectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
|
|
||||||
template.getDb().createCollection("user", createCollectionOptions);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
|
|||||||
Reference in New Issue
Block a user