Add fullDocumentBeforeChange support for change streams.

Closes: #4187
Original Pull Request: #4193
This commit is contained in:
myroslav.kosinskyi
2022-10-05 17:39:29 +03:00
committed by Christoph Strobl
parent a5725806f5
commit aa35aaeb70
7 changed files with 306 additions and 16 deletions

View File

@@ -36,21 +36,29 @@ import com.mongodb.client.model.changestream.OperationType;
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Myroslav Kosinskyi
* @since 2.1
*/
public class ChangeStreamEvent<T> {
@SuppressWarnings("rawtypes") //
private static final AtomicReferenceFieldUpdater<ChangeStreamEvent, Object> CONVERTED_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(ChangeStreamEvent.class, Object.class, "converted");
private static final AtomicReferenceFieldUpdater<ChangeStreamEvent, Object> CONVERTED_FULL_DOCUMENT_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(ChangeStreamEvent.class, Object.class, "convertedFullDocument");
@SuppressWarnings("rawtypes") //
private static final AtomicReferenceFieldUpdater<ChangeStreamEvent, Object> CONVERTED_FULL_DOCUMENT_BEFORE_CHANGE_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(ChangeStreamEvent.class, Object.class, "convertedFullDocumentBeforeChange");
private final @Nullable ChangeStreamDocument<Document> raw;
private final Class<T> targetType;
private final MongoConverter converter;
// accessed through CONVERTED_UPDATER.
private volatile @Nullable T converted;
// accessed through CONVERTED_FULL_DOCUMENT_UPDATER.
private volatile @Nullable T convertedFullDocument;
// accessed through CONVERTED_FULL_DOCUMENT_BEFORE_CHANGE_UPDATER.
private volatile @Nullable T convertedFullDocumentBeforeChange;
/**
* @param raw can be {@literal null}.
@@ -147,27 +155,36 @@ public class ChangeStreamEvent<T> {
@Nullable
public T getBody() {
if (raw == null) {
if (raw == null || raw.getFullDocument() == null) {
return null;
}
Document fullDocument = raw.getFullDocument();
return getConvertedFullDocument(raw.getFullDocument());
}
if (fullDocument == null) {
return targetType.cast(fullDocument);
@Nullable
public T getBodyBeforeChange() {
if (raw == null || raw.getFullDocumentBeforeChange() == null) {
return null;
}
return getConverted(fullDocument);
return getConvertedFullDocumentBeforeChange(raw.getFullDocumentBeforeChange());
}
@SuppressWarnings("unchecked")
private T getConverted(Document fullDocument) {
return (T) doGetConverted(fullDocument);
private T getConvertedFullDocumentBeforeChange(Document fullDocument) {
return (T) doGetConverted(fullDocument, CONVERTED_FULL_DOCUMENT_BEFORE_CHANGE_UPDATER);
}
private Object doGetConverted(Document fullDocument) {
@SuppressWarnings("unchecked")
private T getConvertedFullDocument(Document fullDocument) {
return (T) doGetConverted(fullDocument, CONVERTED_FULL_DOCUMENT_UPDATER);
}
Object result = CONVERTED_UPDATER.get(this);
private Object doGetConverted(Document fullDocument, AtomicReferenceFieldUpdater<ChangeStreamEvent, Object> updater) {
Object result = updater.get(this);
if (result != null) {
return result;
@@ -176,13 +193,13 @@ public class ChangeStreamEvent<T> {
if (ClassUtils.isAssignable(Document.class, fullDocument.getClass())) {
result = converter.read(targetType, fullDocument);
return CONVERTED_UPDATER.compareAndSet(this, null, result) ? result : CONVERTED_UPDATER.get(this);
return updater.compareAndSet(this, null, result) ? result : updater.get(this);
}
if (converter.getConversionService().canConvert(fullDocument.getClass(), targetType)) {
result = converter.getConversionService().convert(fullDocument, targetType);
return CONVERTED_UPDATER.compareAndSet(this, null, result) ? result : CONVERTED_UPDATER.get(this);
return updater.compareAndSet(this, null, result) ? result : updater.get(this);
}
throw new IllegalArgumentException(

View File

@@ -19,6 +19,7 @@ import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
@@ -40,6 +41,7 @@ import com.mongodb.client.model.changestream.FullDocument;
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Myroslav Kosinskyi
* @since 2.1
*/
public class ChangeStreamOptions {
@@ -47,6 +49,7 @@ public class ChangeStreamOptions {
private @Nullable Object filter;
private @Nullable BsonValue resumeToken;
private @Nullable FullDocument fullDocumentLookup;
private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup;
private @Nullable Collation collation;
private @Nullable Object resumeTimestamp;
private Resume resume = Resume.UNDEFINED;
@@ -74,6 +77,13 @@ public class ChangeStreamOptions {
return Optional.ofNullable(fullDocumentLookup);
}
/**
* @return {@link Optional#empty()} if not set.
*/
public Optional<FullDocumentBeforeChange> getFullDocumentBeforeChangeLookup() {
return Optional.ofNullable(fullDocumentBeforeChangeLookup);
}
/**
* @return {@link Optional#empty()} if not set.
*/
@@ -170,6 +180,9 @@ public class ChangeStreamOptions {
if (!ObjectUtils.nullSafeEquals(this.fullDocumentLookup, that.fullDocumentLookup)) {
return false;
}
if (!ObjectUtils.nullSafeEquals(this.fullDocumentBeforeChangeLookup, that.fullDocumentBeforeChangeLookup)) {
return false;
}
if (!ObjectUtils.nullSafeEquals(this.collation, that.collation)) {
return false;
}
@@ -184,6 +197,7 @@ public class ChangeStreamOptions {
int result = ObjectUtils.nullSafeHashCode(filter);
result = 31 * result + ObjectUtils.nullSafeHashCode(resumeToken);
result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentLookup);
result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentBeforeChangeLookup);
result = 31 * result + ObjectUtils.nullSafeHashCode(collation);
result = 31 * result + ObjectUtils.nullSafeHashCode(resumeTimestamp);
result = 31 * result + ObjectUtils.nullSafeHashCode(resume);
@@ -220,6 +234,7 @@ public class ChangeStreamOptions {
private @Nullable Object filter;
private @Nullable BsonValue resumeToken;
private @Nullable FullDocument fullDocumentLookup;
private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup;
private @Nullable Collation collation;
private @Nullable Object resumeTimestamp;
private Resume resume = Resume.UNDEFINED;
@@ -322,6 +337,20 @@ public class ChangeStreamOptions {
return this;
}
/**
* Set the {@link FullDocumentBeforeChange} lookup to use.
*
* @param lookup must not be {@literal null}.
* @return this.
*/
public ChangeStreamOptionsBuilder fullDocumentBeforeChangeLookup(FullDocumentBeforeChange lookup) {
Assert.notNull(lookup, "Lookup must not be null");
this.fullDocumentBeforeChangeLookup = lookup;
return this;
}
/**
* Set the cluster time to resume from.
*
@@ -391,6 +420,7 @@ public class ChangeStreamOptions {
options.filter = this.filter;
options.resumeToken = this.resumeToken;
options.fullDocumentLookup = this.fullDocumentLookup;
options.fullDocumentBeforeChangeLookup = this.fullDocumentBeforeChangeLookup;
options.collation = this.collation;
options.resumeTimestamp = this.resumeTimestamp;
options.resume = this.resume;

View File

@@ -18,6 +18,7 @@ package org.springframework.data.mongodb.core.messaging;
import java.time.Duration;
import java.time.Instant;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import org.bson.BsonValue;
import org.bson.Document;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
@@ -90,6 +91,7 @@ import com.mongodb.client.model.changestream.FullDocument;
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Myroslav Kosinskyi
* @since 2.1
*/
public class ChangeStreamRequest<T>
@@ -425,6 +427,20 @@ public class ChangeStreamRequest<T>
return this;
}
/**
* @return this.
* @see #fullDocumentBeforeChangeLookup(FullDocumentBeforeChange) (FullDocumentBeforeChange)
* @see ChangeStreamOptions#getFullDocumentBeforeChangeLookup()
* @see ChangeStreamOptionsBuilder#fullDocumentBeforeChangeLookup(FullDocumentBeforeChange)
*/
public ChangeStreamRequestBuilder<T> fullDocumentBeforeChangeLookup(FullDocumentBeforeChange lookup) {
Assert.notNull(lookup, "FullDocumentBeforeChange not be null");
this.delegate.fullDocumentBeforeChangeLookup(lookup);
return this;
}
/**
* Set the cursors maximum wait time on the server (for a new Document to be emitted).
*

View File

@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
@@ -58,6 +59,7 @@ import com.mongodb.client.model.changestream.FullDocument;
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Myroslav Kosinskyi
* @since 2.1
*/
class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, Object> {
@@ -86,6 +88,7 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
Collation collation = null;
FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP;
FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT;
BsonTimestamp startAt = null;
boolean resumeAfter = true;
@@ -113,6 +116,9 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP);
fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup()
.orElse(FullDocumentBeforeChange.DEFAULT);
startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null);
}
@@ -152,6 +158,7 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
}
iterable = iterable.fullDocument(fullDocument);
iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange);
return iterable.iterator();
}
@@ -230,6 +237,12 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
return delegate.getBody();
}
@Nullable
@Override
public T getBodyBeforeChange() {
return delegate.getBodyBeforeChange();
}
@Override
public MessageProperties getProperties() {
return this.messageProperties;

View File

@@ -31,6 +31,7 @@ import org.springframework.util.ObjectUtils;
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Myroslav Kosinskyi
* @see MessageProperties
* @since 2.1
*/
@@ -52,6 +53,16 @@ public interface Message<S, T> {
@Nullable
T getBody();
/**
* The converted message body before change if available.
*
* @return can be {@literal null}.
*/
@Nullable
default T getBodyBeforeChange() {
return null;
}
/**
* {@link MessageProperties} containing information about the {@link Message} origin and other metadata.
*

View File

@@ -42,6 +42,7 @@ import com.mongodb.client.model.changestream.ChangeStreamDocument;
/**
* @author Christoph Strobl
* @author Myroslav Kosinskyi
*/
@ExtendWith(MockitoExtension.class)
class ChangeStreamTaskUnitTests {
@@ -67,6 +68,8 @@ class ChangeStreamTaskUnitTests {
when(mongoCollection.watch(eq(Document.class))).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocument(any())).thenReturn(changeStreamIterable);
when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable);
}
@Test // DATAMONGO-2258

View File

@@ -21,6 +21,9 @@ import static org.springframework.data.mongodb.core.messaging.SubscriptionUtils.
import static org.springframework.data.mongodb.core.query.Criteria.*;
import static org.springframework.data.mongodb.core.query.Query.*;
import com.mongodb.client.model.ChangeStreamPreAndPostImagesOptions;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -47,7 +50,6 @@ import org.springframework.data.mongodb.core.mapping.Field;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.ChangeStreamTask.ChangeStreamEventMessage;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
import org.springframework.data.mongodb.core.messaging.SubscriptionUtils.*;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.test.util.EnableIfMongoServerVersion;
@@ -67,6 +69,7 @@ import org.junitpioneer.jupiter.RepeatFailedTest;
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Myroslav Kosinskyi
*/
@ExtendWith({ MongoTemplateExtension.class })
@EnableIfReplicaSetAvailable
@@ -538,6 +541,194 @@ class ChangeStreamTests {
assertThat(messageBodies).hasSize(2);
}
@Test // issue/41087
@EnableIfMongoServerVersion(isGreaterThanEqual = "6.0")
void readsFullDocumentBeforeChangeWhenOptionDeclaredWhenAvailable() throws InterruptedException {
createUserCollectionWithChangeStreamPreAndPostImagesEnabled();
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.fullDocumentLookup(FullDocument.WHEN_AVAILABLE) //
.fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.WHEN_AVAILABLE) //
.maxAwaitTime(Duration.ofMillis(10)) //
.publishTo(messageListener).build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first();
awaitMessages(messageListener, 2);
assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull();
assertThat(messageListener.getFirstMessage().getBody()).isEqualTo(jellyBelly);
assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isEqualTo(jellyBelly);
assertThat(messageListener.getLastMessage().getBody()).isEqualTo(jellyBelly.withAge(8));
}
@Test // issue/41087
@EnableIfMongoServerVersion(isGreaterThanEqual = "6.0")
void readsFullDocumentBeforeChangeWhenOptionDeclaredRequired() throws InterruptedException {
createUserCollectionWithChangeStreamPreAndPostImagesEnabled();
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.fullDocumentLookup(FullDocument.WHEN_AVAILABLE) //
.fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED) //
.maxAwaitTime(Duration.ofMillis(10)) //
.publishTo(messageListener).build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first();
awaitMessages(messageListener, 2);
assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull();
assertThat(messageListener.getFirstMessage().getBody()).isEqualTo(jellyBelly);
assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isEqualTo(jellyBelly);
assertThat(messageListener.getLastMessage().getBody()).isEqualTo(jellyBelly.withAge(8));
}
@Test // issue/41087
@EnableIfMongoServerVersion(isGreaterThanEqual = "6.0")
void readsFullDocumentBeforeChangeWhenOptionIsNotDeclared() throws InterruptedException {
createUserCollectionWithChangeStreamPreAndPostImagesEnabled();
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.maxAwaitTime(Duration.ofMillis(10)) //
.publishTo(messageListener).build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first();
awaitMessages(messageListener, 2);
assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull();
assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull();
}
@Test // issue/41087
@EnableIfMongoServerVersion(isGreaterThanEqual = "6.0")
void readsFullDocumentBeforeChangeWhenOptionDeclaredDefault() throws InterruptedException {
createUserCollectionWithChangeStreamPreAndPostImagesEnabled();
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.DEFAULT).maxAwaitTime(Duration.ofMillis(10)) //
.publishTo(messageListener).build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first();
awaitMessages(messageListener, 2);
assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull();
assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull();
}
@Test // issue/41087
@EnableIfMongoServerVersion(isGreaterThanEqual = "6.0")
void readsFullDocumentBeforeChangeWhenOptionDeclaredOff() throws InterruptedException {
createUserCollectionWithChangeStreamPreAndPostImagesEnabled();
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.OFF).maxAwaitTime(Duration.ofMillis(10)) //
.publishTo(messageListener).build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first();
awaitMessages(messageListener, 2);
assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull();
assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull();
}
@Test // issue/41087
@EnableIfMongoServerVersion(isGreaterThanEqual = "6.0")
void readsFullDocumentBeforeChangeWhenOptionDeclaredWhenAvailableAndChangeStreamPreAndPostImagesDisabled()
throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.WHEN_AVAILABLE).maxAwaitTime(Duration.ofMillis(10)) //
.publishTo(messageListener).build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first();
awaitMessages(messageListener, 2);
assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull();
assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull();
}
@Test // issue/41087
@EnableIfMongoServerVersion(isLessThan = "6.0")
void readsFullDocumentBeforeChangeWhenOptionDeclaredRequiredAndMongoVersionIsLessThan6() throws InterruptedException {
CollectingMessageListener<ChangeStreamDocument<Document>, User> messageListener = new CollectingMessageListener<>();
ChangeStreamRequest<User> request = ChangeStreamRequest.builder() //
.collection("user") //
.fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED).maxAwaitTime(Duration.ofMillis(10)) //
.publishTo(messageListener).build();
Subscription subscription = container.register(request, User.class);
awaitSubscription(subscription);
template.save(jellyBelly);
template.update(User.class).matching(query(where("id").is(jellyBelly.id))).apply(Update.update("age", 8)).first();
awaitMessages(messageListener, 2);
assertThat(messageListener.getFirstMessage().getBodyBeforeChange()).isNull();
assertThat(messageListener.getLastMessage().getBodyBeforeChange()).isNull();
}
private void createUserCollectionWithChangeStreamPreAndPostImagesEnabled() {
CreateCollectionOptions createCollectionOptions = new CreateCollectionOptions();
createCollectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
template.getDb().createCollection("user", createCollectionOptions);
}
@Data
static class User {
@@ -546,6 +737,15 @@ class ChangeStreamTests {
int age;
Address address;
User withAge(int age) {
User user = new User();
user.id = id;
user.userName = userName;
user.age = age;
return user;
}
}
@Data