Polishing.
Extract common code into BulkOperationsSupport. Reorder methods. Add missing verifyComplete to tests. See #2821 Original pull request: #4342
This commit is contained in:
@@ -31,9 +31,9 @@ import com.mongodb.bulk.BulkWriteResult;
|
||||
* {@link #execute()}.
|
||||
*
|
||||
* <pre class="code">
|
||||
* MongoTemplate template = …;
|
||||
* MongoOperations ops = …;
|
||||
*
|
||||
* template.bulkOps(BulkMode.UNORDERED, Person.class)
|
||||
* ops.bulkOps(BulkMode.UNORDERED, Person.class)
|
||||
* .insert(newPerson)
|
||||
* .updateOne(where("firstname").is("Joe"), Update.update("lastname", "Doe"))
|
||||
* .execute();
|
||||
|
||||
@@ -0,0 +1,221 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.data.mongodb.core;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.bson.conversions.Bson;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
import org.springframework.data.mapping.PersistentEntity;
|
||||
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
|
||||
import org.springframework.data.mongodb.core.aggregation.RelaxedTypeBasedAggregationOperationContext;
|
||||
import org.springframework.data.mongodb.core.convert.QueryMapper;
|
||||
import org.springframework.data.mongodb.core.convert.UpdateMapper;
|
||||
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
|
||||
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
|
||||
import org.springframework.data.mongodb.core.query.Collation;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.data.mongodb.core.query.Update;
|
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition;
|
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import com.mongodb.client.model.BulkWriteOptions;
|
||||
import com.mongodb.client.model.DeleteManyModel;
|
||||
import com.mongodb.client.model.DeleteOneModel;
|
||||
import com.mongodb.client.model.InsertOneModel;
|
||||
import com.mongodb.client.model.ReplaceOneModel;
|
||||
import com.mongodb.client.model.UpdateManyModel;
|
||||
import com.mongodb.client.model.UpdateOneModel;
|
||||
import com.mongodb.client.model.UpdateOptions;
|
||||
import com.mongodb.client.model.WriteModel;
|
||||
|
||||
/**
|
||||
* Support class for bulk operations.
|
||||
*
|
||||
* @author Mark Paluch
|
||||
* @since 4.1
|
||||
*/
|
||||
abstract class BulkOperationsSupport {
|
||||
|
||||
private final String collectionName;
|
||||
|
||||
BulkOperationsSupport(String collectionName) {
|
||||
|
||||
Assert.hasText(collectionName, "CollectionName must not be null nor empty");
|
||||
|
||||
this.collectionName = collectionName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a {@link BeforeSaveEvent}.
|
||||
*
|
||||
* @param holder
|
||||
*/
|
||||
void maybeEmitBeforeSaveEvent(SourceAwareWriteModelHolder holder) {
|
||||
|
||||
if (holder.model() instanceof InsertOneModel) {
|
||||
|
||||
Document target = ((InsertOneModel<Document>) holder.model()).getDocument();
|
||||
maybeEmitEvent(new BeforeSaveEvent<>(holder.source(), target, collectionName));
|
||||
} else if (holder.model() instanceof ReplaceOneModel) {
|
||||
|
||||
Document target = ((ReplaceOneModel<Document>) holder.model()).getReplacement();
|
||||
maybeEmitEvent(new BeforeSaveEvent<>(holder.source(), target, collectionName));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a {@link AfterSaveEvent}.
|
||||
*
|
||||
* @param holder
|
||||
*/
|
||||
void maybeEmitAfterSaveEvent(SourceAwareWriteModelHolder holder) {
|
||||
|
||||
if (holder.model() instanceof InsertOneModel) {
|
||||
|
||||
Document target = ((InsertOneModel<Document>) holder.model()).getDocument();
|
||||
maybeEmitEvent(new AfterSaveEvent<>(holder.source(), target, collectionName));
|
||||
} else if (holder.model() instanceof ReplaceOneModel) {
|
||||
|
||||
Document target = ((ReplaceOneModel<Document>) holder.model()).getReplacement();
|
||||
maybeEmitEvent(new AfterSaveEvent<>(holder.source(), target, collectionName));
|
||||
}
|
||||
}
|
||||
|
||||
WriteModel<Document> mapWriteModel(Object source, WriteModel<Document> writeModel) {
|
||||
|
||||
if (writeModel instanceof UpdateOneModel<Document> model) {
|
||||
|
||||
if (source instanceof AggregationUpdate aggregationUpdate) {
|
||||
|
||||
List<Document> pipeline = mapUpdatePipeline(aggregationUpdate);
|
||||
return new UpdateOneModel<>(getMappedQuery(model.getFilter()), pipeline, model.getOptions());
|
||||
}
|
||||
|
||||
return new UpdateOneModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
|
||||
model.getOptions());
|
||||
}
|
||||
|
||||
if (writeModel instanceof UpdateManyModel<Document> model) {
|
||||
|
||||
if (source instanceof AggregationUpdate aggregationUpdate) {
|
||||
|
||||
List<Document> pipeline = mapUpdatePipeline(aggregationUpdate);
|
||||
return new UpdateManyModel<>(getMappedQuery(model.getFilter()), pipeline, model.getOptions());
|
||||
}
|
||||
|
||||
return new UpdateManyModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
|
||||
model.getOptions());
|
||||
}
|
||||
|
||||
if (writeModel instanceof DeleteOneModel<Document> model) {
|
||||
return new DeleteOneModel<>(getMappedQuery(model.getFilter()), model.getOptions());
|
||||
}
|
||||
|
||||
if (writeModel instanceof DeleteManyModel<Document> model) {
|
||||
return new DeleteManyModel<>(getMappedQuery(model.getFilter()), model.getOptions());
|
||||
}
|
||||
|
||||
return writeModel;
|
||||
}
|
||||
|
||||
private List<Document> mapUpdatePipeline(AggregationUpdate source) {
|
||||
|
||||
Class<?> type = entity().isPresent() ? entity().map(PersistentEntity::getType).get() : Object.class;
|
||||
AggregationOperationContext context = new RelaxedTypeBasedAggregationOperationContext(type,
|
||||
updateMapper().getMappingContext(), queryMapper());
|
||||
|
||||
return new AggregationUtil(queryMapper(), queryMapper().getMappingContext()).createPipeline(source, context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a {@link ApplicationEvent} if event multicasting is enabled.
|
||||
*
|
||||
* @param event
|
||||
*/
|
||||
protected abstract void maybeEmitEvent(ApplicationEvent event);
|
||||
|
||||
/**
|
||||
* @return the {@link UpdateMapper} to use.
|
||||
*/
|
||||
protected abstract UpdateMapper updateMapper();
|
||||
|
||||
/**
|
||||
* @return the {@link QueryMapper} to use.
|
||||
*/
|
||||
protected abstract QueryMapper queryMapper();
|
||||
|
||||
/**
|
||||
* @return the associated {@link PersistentEntity}. Can be {@link Optional#empty()}.
|
||||
*/
|
||||
protected abstract Optional<? extends MongoPersistentEntity<?>> entity();
|
||||
|
||||
protected Bson getMappedUpdate(Bson update) {
|
||||
return updateMapper().getMappedObject(update, entity());
|
||||
}
|
||||
|
||||
protected Bson getMappedQuery(Bson query) {
|
||||
return queryMapper().getMappedObject(query, entity());
|
||||
}
|
||||
|
||||
protected static BulkWriteOptions getBulkWriteOptions(BulkMode bulkMode) {
|
||||
|
||||
BulkWriteOptions options = new BulkWriteOptions();
|
||||
|
||||
return switch (bulkMode) {
|
||||
case ORDERED -> options.ordered(true);
|
||||
case UNORDERED -> options.ordered(false);
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* @param filterQuery The {@link Query} to read a potential {@link Collation} from. Must not be {@literal null}.
|
||||
* @param update The {@link Update} to apply
|
||||
* @param upsert flag to indicate if document should be upserted.
|
||||
* @return new instance of {@link UpdateOptions}.
|
||||
*/
|
||||
protected static UpdateOptions computeUpdateOptions(Query filterQuery, UpdateDefinition update, boolean upsert) {
|
||||
|
||||
UpdateOptions options = new UpdateOptions();
|
||||
options.upsert(upsert);
|
||||
|
||||
if (update.hasArrayFilters()) {
|
||||
List<Document> list = new ArrayList<>(update.getArrayFilters().size());
|
||||
for (ArrayFilter arrayFilter : update.getArrayFilters()) {
|
||||
list.add(arrayFilter.asDocument());
|
||||
}
|
||||
options.arrayFilters(list);
|
||||
}
|
||||
|
||||
filterQuery.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation);
|
||||
return options;
|
||||
}
|
||||
|
||||
/**
|
||||
* Value object chaining together an actual source with its {@link WriteModel} representation.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
record SourceAwareWriteModelHolder(Object source, WriteModel<Document> model) {
|
||||
}
|
||||
}
|
||||
@@ -21,32 +21,24 @@ import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.bson.conversions.Bson;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.dao.DataIntegrityViolationException;
|
||||
import org.springframework.data.mapping.PersistentEntity;
|
||||
import org.springframework.data.mapping.callback.EntityCallback;
|
||||
import org.springframework.data.mapping.callback.EntityCallbacks;
|
||||
import org.springframework.data.mongodb.BulkOperationException;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;
|
||||
import org.springframework.data.mongodb.core.aggregation.AggregationUpdate;
|
||||
import org.springframework.data.mongodb.core.aggregation.RelaxedTypeBasedAggregationOperationContext;
|
||||
import org.springframework.data.mongodb.core.convert.QueryMapper;
|
||||
import org.springframework.data.mongodb.core.convert.UpdateMapper;
|
||||
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
|
||||
import org.springframework.data.mongodb.core.mapping.event.AfterSaveCallback;
|
||||
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertCallback;
|
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveCallback;
|
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent;
|
||||
import org.springframework.data.mongodb.core.query.Collation;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.data.mongodb.core.query.Update;
|
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition;
|
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
|
||||
import org.springframework.data.util.Pair;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
@@ -55,7 +47,16 @@ import com.mongodb.MongoBulkWriteException;
|
||||
import com.mongodb.WriteConcern;
|
||||
import com.mongodb.bulk.BulkWriteResult;
|
||||
import com.mongodb.client.MongoCollection;
|
||||
import com.mongodb.client.model.*;
|
||||
import com.mongodb.client.model.BulkWriteOptions;
|
||||
import com.mongodb.client.model.DeleteManyModel;
|
||||
import com.mongodb.client.model.DeleteOptions;
|
||||
import com.mongodb.client.model.InsertOneModel;
|
||||
import com.mongodb.client.model.ReplaceOneModel;
|
||||
import com.mongodb.client.model.ReplaceOptions;
|
||||
import com.mongodb.client.model.UpdateManyModel;
|
||||
import com.mongodb.client.model.UpdateOneModel;
|
||||
import com.mongodb.client.model.UpdateOptions;
|
||||
import com.mongodb.client.model.WriteModel;
|
||||
|
||||
/**
|
||||
* Default implementation for {@link BulkOperations}.
|
||||
@@ -71,7 +72,7 @@ import com.mongodb.client.model.*;
|
||||
* @author Jacob Botuck
|
||||
* @since 1.9
|
||||
*/
|
||||
class DefaultBulkOperations implements BulkOperations {
|
||||
class DefaultBulkOperations extends BulkOperationsSupport implements BulkOperations {
|
||||
|
||||
private final MongoOperations mongoOperations;
|
||||
private final String collectionName;
|
||||
@@ -93,6 +94,7 @@ class DefaultBulkOperations implements BulkOperations {
|
||||
DefaultBulkOperations(MongoOperations mongoOperations, String collectionName,
|
||||
BulkOperationContext bulkOperationContext) {
|
||||
|
||||
super(collectionName);
|
||||
Assert.notNull(mongoOperations, "MongoOperations must not be null");
|
||||
Assert.hasText(collectionName, "CollectionName must not be null nor empty");
|
||||
Assert.notNull(bulkOperationContext, "BulkOperationContext must not be null");
|
||||
@@ -135,7 +137,6 @@ class DefaultBulkOperations implements BulkOperations {
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public BulkOperations updateOne(Query query, UpdateDefinition update) {
|
||||
|
||||
Assert.notNull(query, "Query must not be null");
|
||||
@@ -157,7 +158,6 @@ class DefaultBulkOperations implements BulkOperations {
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public BulkOperations updateMulti(Query query, UpdateDefinition update) {
|
||||
|
||||
Assert.notNull(query, "Query must not be null");
|
||||
@@ -326,61 +326,24 @@ class DefaultBulkOperations implements BulkOperations {
|
||||
return this;
|
||||
}
|
||||
|
||||
private WriteModel<Document> mapWriteModel(Object source, WriteModel<Document> writeModel) {
|
||||
|
||||
if (writeModel instanceof UpdateOneModel<Document> model) {
|
||||
|
||||
if (source instanceof AggregationUpdate aggregationUpdate) {
|
||||
|
||||
List<Document> pipeline = mapUpdatePipeline(aggregationUpdate);
|
||||
return new UpdateOneModel<>(getMappedQuery(model.getFilter()), pipeline, model.getOptions());
|
||||
}
|
||||
|
||||
return new UpdateOneModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
|
||||
model.getOptions());
|
||||
}
|
||||
|
||||
if (writeModel instanceof UpdateManyModel<Document> model) {
|
||||
|
||||
if (source instanceof AggregationUpdate aggregationUpdate) {
|
||||
|
||||
List<Document> pipeline = mapUpdatePipeline(aggregationUpdate);
|
||||
return new UpdateManyModel<>(getMappedQuery(model.getFilter()), pipeline, model.getOptions());
|
||||
}
|
||||
|
||||
return new UpdateManyModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
|
||||
model.getOptions());
|
||||
}
|
||||
|
||||
if (writeModel instanceof DeleteOneModel<Document> model) {
|
||||
return new DeleteOneModel<>(getMappedQuery(model.getFilter()), model.getOptions());
|
||||
}
|
||||
|
||||
if (writeModel instanceof DeleteManyModel<Document> model) {
|
||||
return new DeleteManyModel<>(getMappedQuery(model.getFilter()), model.getOptions());
|
||||
}
|
||||
|
||||
return writeModel;
|
||||
@Override
|
||||
protected void maybeEmitEvent(ApplicationEvent event) {
|
||||
bulkOperationContext.publishEvent(event);
|
||||
}
|
||||
|
||||
private List<Document> mapUpdatePipeline(AggregationUpdate source) {
|
||||
|
||||
Class<?> type = bulkOperationContext.entity().isPresent()
|
||||
? bulkOperationContext.entity().map(PersistentEntity::getType).get()
|
||||
: Object.class;
|
||||
AggregationOperationContext context = new RelaxedTypeBasedAggregationOperationContext(type,
|
||||
bulkOperationContext.updateMapper().getMappingContext(), bulkOperationContext.queryMapper());
|
||||
|
||||
return new AggregationUtil(bulkOperationContext.queryMapper(),
|
||||
bulkOperationContext.queryMapper().getMappingContext()).createPipeline(source, context);
|
||||
@Override
|
||||
protected UpdateMapper updateMapper() {
|
||||
return bulkOperationContext.updateMapper();
|
||||
}
|
||||
|
||||
private Bson getMappedUpdate(Bson update) {
|
||||
return bulkOperationContext.updateMapper().getMappedObject(update, bulkOperationContext.entity());
|
||||
@Override
|
||||
protected QueryMapper queryMapper() {
|
||||
return bulkOperationContext.queryMapper();
|
||||
}
|
||||
|
||||
private Bson getMappedQuery(Bson query) {
|
||||
return bulkOperationContext.queryMapper().getMappedObject(query, bulkOperationContext.entity());
|
||||
@Override
|
||||
protected Optional<? extends MongoPersistentEntity<?>> entity() {
|
||||
return bulkOperationContext.entity();
|
||||
}
|
||||
|
||||
private Document getMappedObject(Object source) {
|
||||
@@ -399,32 +362,6 @@ class DefaultBulkOperations implements BulkOperations {
|
||||
models.add(new SourceAwareWriteModelHolder(source, model));
|
||||
}
|
||||
|
||||
private void maybeEmitBeforeSaveEvent(SourceAwareWriteModelHolder holder) {
|
||||
|
||||
if (holder.model() instanceof InsertOneModel) {
|
||||
|
||||
Document target = ((InsertOneModel<Document>) holder.model()).getDocument();
|
||||
maybeEmitEvent(new BeforeSaveEvent<>(holder.source(), target, collectionName));
|
||||
} else if (holder.model() instanceof ReplaceOneModel) {
|
||||
|
||||
Document target = ((ReplaceOneModel<Document>) holder.model()).getReplacement();
|
||||
maybeEmitEvent(new BeforeSaveEvent<>(holder.source(), target, collectionName));
|
||||
}
|
||||
}
|
||||
|
||||
private void maybeEmitAfterSaveEvent(SourceAwareWriteModelHolder holder) {
|
||||
|
||||
if (holder.model() instanceof InsertOneModel) {
|
||||
|
||||
Document target = ((InsertOneModel<Document>) holder.model()).getDocument();
|
||||
maybeEmitEvent(new AfterSaveEvent<>(holder.source(), target, collectionName));
|
||||
} else if (holder.model() instanceof ReplaceOneModel) {
|
||||
|
||||
Document target = ((ReplaceOneModel<Document>) holder.model()).getReplacement();
|
||||
maybeEmitEvent(new AfterSaveEvent<>(holder.source(), target, collectionName));
|
||||
}
|
||||
}
|
||||
|
||||
private void maybeInvokeAfterSaveCallback(SourceAwareWriteModelHolder holder) {
|
||||
|
||||
if (holder.model() instanceof InsertOneModel) {
|
||||
@@ -438,7 +375,7 @@ class DefaultBulkOperations implements BulkOperations {
|
||||
}
|
||||
}
|
||||
|
||||
private void maybeEmitEvent(MongoMappingEvent<?> event) {
|
||||
private void publishEvent(MongoMappingEvent<?> event) {
|
||||
bulkOperationContext.publishEvent(event);
|
||||
}
|
||||
|
||||
@@ -454,43 +391,6 @@ class DefaultBulkOperations implements BulkOperations {
|
||||
return bulkOperationContext.callback(AfterSaveCallback.class, value, mappedDocument, collectionName);
|
||||
}
|
||||
|
||||
private static BulkWriteOptions getBulkWriteOptions(BulkMode bulkMode) {
|
||||
|
||||
BulkWriteOptions options = new BulkWriteOptions();
|
||||
|
||||
switch (bulkMode) {
|
||||
case ORDERED:
|
||||
return options.ordered(true);
|
||||
case UNORDERED:
|
||||
return options.ordered(false);
|
||||
}
|
||||
|
||||
throw new IllegalStateException("BulkMode was null");
|
||||
}
|
||||
|
||||
/**
|
||||
* @param filterQuery The {@link Query} to read a potential {@link Collation} from. Must not be {@literal null}.
|
||||
* @param update The {@link Update} to apply
|
||||
* @param upsert flag to indicate if document should be upserted.
|
||||
* @return new instance of {@link UpdateOptions}.
|
||||
*/
|
||||
private static UpdateOptions computeUpdateOptions(Query filterQuery, UpdateDefinition update, boolean upsert) {
|
||||
|
||||
UpdateOptions options = new UpdateOptions();
|
||||
options.upsert(upsert);
|
||||
|
||||
if (update.hasArrayFilters()) {
|
||||
List<Document> list = new ArrayList<>(update.getArrayFilters().size());
|
||||
for (ArrayFilter arrayFilter : update.getArrayFilters()) {
|
||||
list.add(arrayFilter.asDocument());
|
||||
}
|
||||
options.arrayFilters(list);
|
||||
}
|
||||
|
||||
filterQuery.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation);
|
||||
return options;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link BulkOperationContext} holds information about {@link BulkMode} the entity in use as well as references to
|
||||
* {@link QueryMapper} and {@link UpdateMapper}.
|
||||
@@ -502,14 +402,14 @@ class DefaultBulkOperations implements BulkOperations {
|
||||
QueryMapper queryMapper, UpdateMapper updateMapper, @Nullable ApplicationEventPublisher eventPublisher,
|
||||
@Nullable EntityCallbacks entityCallbacks) {
|
||||
|
||||
public boolean skipEventPublishing() {
|
||||
return eventPublisher == null;
|
||||
}
|
||||
|
||||
public boolean skipEntityCallbacks() {
|
||||
return entityCallbacks == null;
|
||||
}
|
||||
|
||||
public boolean skipEventPublishing() {
|
||||
return eventPublisher == null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
public <T> T callback(Class<? extends EntityCallback> callbackType, T entity, String collectionName) {
|
||||
|
||||
@@ -541,13 +441,4 @@ class DefaultBulkOperations implements BulkOperations {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Value object chaining together an actual source with its {@link WriteModel} representation.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @since 2.2
|
||||
*/
|
||||
record SourceAwareWriteModelHolder(Object source, WriteModel<Document> model) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,19 +24,15 @@ import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.bson.Document;
|
||||
import org.bson.conversions.Bson;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.dao.DataIntegrityViolationException;
|
||||
import org.springframework.data.mapping.callback.EntityCallback;
|
||||
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
|
||||
import org.springframework.data.mongodb.BulkOperationException;
|
||||
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
|
||||
import org.springframework.data.mongodb.core.convert.QueryMapper;
|
||||
import org.springframework.data.mongodb.core.convert.UpdateMapper;
|
||||
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
|
||||
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.BeforeSaveEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.MongoMappingEvent;
|
||||
import org.springframework.data.mongodb.core.mapping.event.ReactiveAfterSaveCallback;
|
||||
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeConvertCallback;
|
||||
import org.springframework.data.mongodb.core.mapping.event.ReactiveBeforeSaveCallback;
|
||||
@@ -44,24 +40,30 @@ import org.springframework.data.mongodb.core.query.Collation;
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.data.mongodb.core.query.Update;
|
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition;
|
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition.ArrayFilter;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
import com.mongodb.MongoBulkWriteException;
|
||||
import com.mongodb.WriteConcern;
|
||||
import com.mongodb.bulk.BulkWriteResult;
|
||||
import com.mongodb.client.model.*;
|
||||
import com.mongodb.client.model.BulkWriteOptions;
|
||||
import com.mongodb.client.model.DeleteManyModel;
|
||||
import com.mongodb.client.model.DeleteOptions;
|
||||
import com.mongodb.client.model.InsertOneModel;
|
||||
import com.mongodb.client.model.ReplaceOneModel;
|
||||
import com.mongodb.client.model.ReplaceOptions;
|
||||
import com.mongodb.client.model.UpdateManyModel;
|
||||
import com.mongodb.client.model.UpdateOneModel;
|
||||
import com.mongodb.client.model.UpdateOptions;
|
||||
import com.mongodb.reactivestreams.client.MongoCollection;
|
||||
|
||||
/**
|
||||
* Default implementation for {@link ReactiveBulkOperations}.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
* @author Mark Paluch
|
||||
* @since 4.1
|
||||
*/
|
||||
class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
|
||||
class DefaultReactiveBulkOperations extends BulkOperationsSupport implements ReactiveBulkOperations {
|
||||
|
||||
private final ReactiveMongoOperations mongoOperations;
|
||||
private final String collectionName;
|
||||
@@ -83,6 +85,8 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
|
||||
DefaultReactiveBulkOperations(ReactiveMongoOperations mongoOperations, String collectionName,
|
||||
ReactiveBulkOperationContext bulkOperationContext) {
|
||||
|
||||
super(collectionName);
|
||||
|
||||
Assert.notNull(mongoOperations, "MongoOperations must not be null");
|
||||
Assert.hasText(collectionName, "CollectionName must not be null nor empty");
|
||||
Assert.notNull(bulkOperationContext, "BulkOperationContext must not be null");
|
||||
@@ -90,7 +94,7 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
|
||||
this.mongoOperations = mongoOperations;
|
||||
this.collectionName = collectionName;
|
||||
this.bulkOperationContext = bulkOperationContext;
|
||||
this.bulkOptions = getBulkWriteOptions(bulkOperationContext.getBulkMode());
|
||||
this.bulkOptions = getBulkWriteOptions(bulkOperationContext.bulkMode());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -110,7 +114,7 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
|
||||
this.models.add(Mono.just(document).flatMap(it -> {
|
||||
maybeEmitEvent(new BeforeConvertEvent<>(it, collectionName));
|
||||
return maybeInvokeBeforeConvertCallback(it);
|
||||
}).map(it -> new SourceAwareWriteModelHolder(it, new InsertOneModel(getMappedObject(it)))));
|
||||
}).map(it -> new SourceAwareWriteModelHolder(it, new InsertOneModel<>(getMappedObject(it)))));
|
||||
|
||||
return this;
|
||||
}
|
||||
@@ -126,7 +130,6 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public ReactiveBulkOperations updateOne(Query query, UpdateDefinition update) {
|
||||
|
||||
Assert.notNull(query, "Query must not be null");
|
||||
@@ -137,7 +140,6 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public ReactiveBulkOperations updateMulti(Query query, UpdateDefinition update) {
|
||||
|
||||
Assert.notNull(query, "Query must not be null");
|
||||
@@ -202,11 +204,9 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
|
||||
public Mono<BulkWriteResult> execute() {
|
||||
|
||||
try {
|
||||
|
||||
Mono<BulkWriteResult> result = mongoOperations.execute(collectionName, this::bulkWriteTo).next();
|
||||
return result;
|
||||
return mongoOperations.execute(collectionName, this::bulkWriteTo).next();
|
||||
} finally {
|
||||
this.bulkOptions = getBulkWriteOptions(bulkOperationContext.getBulkMode());
|
||||
this.bulkOptions = getBulkWriteOptions(bulkOperationContext.bulkMode());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,55 +216,39 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
|
||||
collection = collection.withWriteConcern(defaultWriteConcern);
|
||||
}
|
||||
|
||||
try {
|
||||
Flux<SourceAwareWriteModelHolder> concat = Flux.concat(models).flatMap(it -> {
|
||||
|
||||
Flux<SourceAwareWriteModelHolder> concat = Flux.concat(models).flatMap(it -> {
|
||||
if (it.getModel()instanceof InsertOneModel<Document> insertOneModel) {
|
||||
if (it.model()instanceof InsertOneModel<Document> iom) {
|
||||
|
||||
Document target = insertOneModel.getDocument();
|
||||
maybeEmitBeforeSaveEvent(it);
|
||||
return maybeInvokeBeforeSaveCallback(it.getSource(), target)
|
||||
.map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(insertOneModel)));
|
||||
} else if (it.getModel()instanceof ReplaceOneModel<Document> replaceOneModel) {
|
||||
Document target = iom.getDocument();
|
||||
maybeEmitBeforeSaveEvent(it);
|
||||
return maybeInvokeBeforeSaveCallback(it.source(), target)
|
||||
.map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(afterCallback, iom)));
|
||||
} else if (it.model()instanceof ReplaceOneModel<Document> rom) {
|
||||
|
||||
Document target = replaceOneModel.getReplacement();
|
||||
maybeEmitBeforeSaveEvent(it);
|
||||
return maybeInvokeBeforeSaveCallback(it.getSource(), target)
|
||||
.map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(replaceOneModel)));
|
||||
}
|
||||
return Mono.just(new SourceAwareWriteModelHolder(it.getSource(), mapWriteModel(it.getModel())));
|
||||
});
|
||||
MongoCollection theCollection = collection;
|
||||
return concat.collectList().flatMap(it -> {
|
||||
|
||||
return Mono
|
||||
.from(theCollection.bulkWrite(
|
||||
it.stream().map(SourceAwareWriteModelHolder::getModel).collect(Collectors.toList()), bulkOptions))
|
||||
.doOnSuccess(state -> {
|
||||
it.forEach(saved -> {
|
||||
maybeEmitAfterSaveEvent(saved);
|
||||
});
|
||||
}).flatMap(state -> {
|
||||
List<Mono<Object>> monos = it.stream().map(saved -> {
|
||||
return maybeInvokeAfterSaveCallback(saved);
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
return Flux.concat(monos).then(Mono.just(state));
|
||||
});
|
||||
});
|
||||
} catch (RuntimeException ex) {
|
||||
|
||||
if (ex instanceof MongoBulkWriteException) {
|
||||
|
||||
MongoBulkWriteException mongoBulkWriteException = (MongoBulkWriteException) ex;
|
||||
if (mongoBulkWriteException.getWriteConcernError() != null) {
|
||||
throw new DataIntegrityViolationException(ex.getMessage(), ex);
|
||||
}
|
||||
throw new BulkOperationException(ex.getMessage(), mongoBulkWriteException);
|
||||
Document target = rom.getReplacement();
|
||||
maybeEmitBeforeSaveEvent(it);
|
||||
return maybeInvokeBeforeSaveCallback(it.source(), target)
|
||||
.map(afterCallback -> new SourceAwareWriteModelHolder(afterCallback, mapWriteModel(afterCallback, rom)));
|
||||
}
|
||||
|
||||
throw ex;
|
||||
}
|
||||
return Mono.just(new SourceAwareWriteModelHolder(it.source(), mapWriteModel(it.source(), it.model())));
|
||||
});
|
||||
|
||||
MongoCollection<Document> theCollection = collection;
|
||||
return concat.collectList().flatMap(it -> {
|
||||
|
||||
return Mono
|
||||
.from(theCollection
|
||||
.bulkWrite(it.stream().map(SourceAwareWriteModelHolder::model).collect(Collectors.toList()), bulkOptions))
|
||||
.doOnSuccess(state -> {
|
||||
it.forEach(this::maybeEmitAfterSaveEvent);
|
||||
}).flatMap(state -> {
|
||||
List<Mono<Object>> monos = it.stream().map(this::maybeInvokeAfterSaveCallback).collect(Collectors.toList());
|
||||
|
||||
return Flux.concat(monos).then(Mono.just(state));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -295,47 +279,24 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
|
||||
return this;
|
||||
}
|
||||
|
||||
private WriteModel<Document> mapWriteModel(WriteModel<Document> writeModel) {
|
||||
|
||||
if (writeModel instanceof UpdateOneModel) {
|
||||
|
||||
UpdateOneModel<Document> model = (UpdateOneModel<Document>) writeModel;
|
||||
|
||||
return new UpdateOneModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
|
||||
model.getOptions());
|
||||
}
|
||||
|
||||
if (writeModel instanceof UpdateManyModel) {
|
||||
|
||||
UpdateManyModel<Document> model = (UpdateManyModel<Document>) writeModel;
|
||||
|
||||
return new UpdateManyModel<>(getMappedQuery(model.getFilter()), getMappedUpdate(model.getUpdate()),
|
||||
model.getOptions());
|
||||
}
|
||||
|
||||
if (writeModel instanceof DeleteOneModel) {
|
||||
|
||||
DeleteOneModel<Document> model = (DeleteOneModel<Document>) writeModel;
|
||||
|
||||
return new DeleteOneModel<>(getMappedQuery(model.getFilter()), model.getOptions());
|
||||
}
|
||||
|
||||
if (writeModel instanceof DeleteManyModel) {
|
||||
|
||||
DeleteManyModel<Document> model = (DeleteManyModel<Document>) writeModel;
|
||||
|
||||
return new DeleteManyModel<>(getMappedQuery(model.getFilter()), model.getOptions());
|
||||
}
|
||||
|
||||
return writeModel;
|
||||
@Override
|
||||
protected void maybeEmitEvent(ApplicationEvent event) {
|
||||
bulkOperationContext.publishEvent(event);
|
||||
}
|
||||
|
||||
private Bson getMappedUpdate(Bson update) {
|
||||
return bulkOperationContext.getUpdateMapper().getMappedObject(update, bulkOperationContext.getEntity());
|
||||
@Override
|
||||
protected UpdateMapper updateMapper() {
|
||||
return bulkOperationContext.updateMapper();
|
||||
}
|
||||
|
||||
private Bson getMappedQuery(Bson query) {
|
||||
return bulkOperationContext.getQueryMapper().getMappedObject(query, bulkOperationContext.getEntity());
|
||||
@Override
|
||||
protected QueryMapper queryMapper() {
|
||||
return bulkOperationContext.queryMapper();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Optional<? extends MongoPersistentEntity<?>> entity() {
|
||||
return bulkOperationContext.entity();
|
||||
}
|
||||
|
||||
private Document getMappedObject(Object source) {
|
||||
@@ -350,121 +311,30 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
|
||||
return sink;
|
||||
}
|
||||
|
||||
private void maybeEmitBeforeSaveEvent(SourceAwareWriteModelHolder holder) {
|
||||
|
||||
if (holder.getModel() instanceof InsertOneModel) {
|
||||
|
||||
Document target = ((InsertOneModel<Document>) holder.getModel()).getDocument();
|
||||
maybeEmitEvent(new BeforeSaveEvent<>(holder.getSource(), target, collectionName));
|
||||
} else if (holder.getModel() instanceof ReplaceOneModel) {
|
||||
|
||||
Document target = ((ReplaceOneModel<Document>) holder.getModel()).getReplacement();
|
||||
maybeEmitEvent(new BeforeSaveEvent<>(holder.getSource(), target, collectionName));
|
||||
}
|
||||
}
|
||||
|
||||
private void maybeEmitAfterSaveEvent(SourceAwareWriteModelHolder holder) {
|
||||
|
||||
if (holder.getModel() instanceof InsertOneModel) {
|
||||
|
||||
Document target = ((InsertOneModel<Document>) holder.getModel()).getDocument();
|
||||
maybeEmitEvent(new AfterSaveEvent<>(holder.getSource(), target, collectionName));
|
||||
} else if (holder.getModel() instanceof ReplaceOneModel) {
|
||||
|
||||
Document target = ((ReplaceOneModel<Document>) holder.getModel()).getReplacement();
|
||||
maybeEmitEvent(new AfterSaveEvent<>(holder.getSource(), target, collectionName));
|
||||
}
|
||||
}
|
||||
|
||||
private Mono<Object> maybeInvokeAfterSaveCallback(SourceAwareWriteModelHolder holder) {
|
||||
|
||||
if (holder.getModel() instanceof InsertOneModel) {
|
||||
if (holder.model() instanceof InsertOneModel) {
|
||||
|
||||
Document target = ((InsertOneModel<Document>) holder.getModel()).getDocument();
|
||||
return maybeInvokeAfterSaveCallback(holder.getSource(), target);
|
||||
} else if (holder.getModel() instanceof ReplaceOneModel) {
|
||||
Document target = ((InsertOneModel<Document>) holder.model()).getDocument();
|
||||
return maybeInvokeAfterSaveCallback(holder.source(), target);
|
||||
} else if (holder.model() instanceof ReplaceOneModel) {
|
||||
|
||||
Document target = ((ReplaceOneModel<Document>) holder.getModel()).getReplacement();
|
||||
return maybeInvokeAfterSaveCallback(holder.getSource(), target);
|
||||
Document target = ((ReplaceOneModel<Document>) holder.model()).getReplacement();
|
||||
return maybeInvokeAfterSaveCallback(holder.source(), target);
|
||||
}
|
||||
return Mono.just(holder.getSource());
|
||||
}
|
||||
|
||||
private <E extends MongoMappingEvent<T>, T> E maybeEmitEvent(E event) {
|
||||
|
||||
if (bulkOperationContext.getEventPublisher() == null) {
|
||||
return event;
|
||||
}
|
||||
|
||||
bulkOperationContext.getEventPublisher().publishEvent(event);
|
||||
return event;
|
||||
return Mono.just(holder.source());
|
||||
}
|
||||
|
||||
private Mono<Object> maybeInvokeBeforeConvertCallback(Object value) {
|
||||
|
||||
if (bulkOperationContext.getEntityCallbacks() == null) {
|
||||
return Mono.just(value);
|
||||
}
|
||||
|
||||
return bulkOperationContext.getEntityCallbacks().callback(ReactiveBeforeConvertCallback.class, value,
|
||||
collectionName);
|
||||
return bulkOperationContext.callback(ReactiveBeforeConvertCallback.class, value, collectionName);
|
||||
}
|
||||
|
||||
private Mono<Object> maybeInvokeBeforeSaveCallback(Object value, Document mappedDocument) {
|
||||
|
||||
if (bulkOperationContext.getEntityCallbacks() == null) {
|
||||
return Mono.just(value);
|
||||
}
|
||||
|
||||
return bulkOperationContext.getEntityCallbacks().callback(ReactiveBeforeSaveCallback.class, value, mappedDocument,
|
||||
collectionName);
|
||||
return bulkOperationContext.callback(ReactiveBeforeSaveCallback.class, value, mappedDocument, collectionName);
|
||||
}
|
||||
|
||||
private Mono<Object> maybeInvokeAfterSaveCallback(Object value, Document mappedDocument) {
|
||||
|
||||
if (bulkOperationContext.getEntityCallbacks() == null) {
|
||||
return Mono.just(value);
|
||||
}
|
||||
|
||||
return bulkOperationContext.getEntityCallbacks().callback(ReactiveAfterSaveCallback.class, value, mappedDocument,
|
||||
collectionName);
|
||||
}
|
||||
|
||||
private static BulkWriteOptions getBulkWriteOptions(BulkMode bulkMode) {
|
||||
|
||||
BulkWriteOptions options = new BulkWriteOptions();
|
||||
|
||||
switch (bulkMode) {
|
||||
case ORDERED:
|
||||
return options.ordered(true);
|
||||
case UNORDERED:
|
||||
return options.ordered(false);
|
||||
}
|
||||
|
||||
throw new IllegalStateException("BulkMode was null");
|
||||
}
|
||||
|
||||
/**
|
||||
* @param filterQuery The {@link Query} to read a potential {@link Collation} from. Must not be {@literal null}.
|
||||
* @param update The {@link Update} to apply
|
||||
* @param upsert flag to indicate if document should be upserted.
|
||||
* @return new instance of {@link UpdateOptions}.
|
||||
*/
|
||||
private static UpdateOptions computeUpdateOptions(Query filterQuery, UpdateDefinition update, boolean upsert) {
|
||||
|
||||
UpdateOptions options = new UpdateOptions();
|
||||
options.upsert(upsert);
|
||||
|
||||
if (update.hasArrayFilters()) {
|
||||
List<Document> list = new ArrayList<>(update.getArrayFilters().size());
|
||||
for (ArrayFilter arrayFilter : update.getArrayFilters()) {
|
||||
list.add(arrayFilter.asDocument());
|
||||
}
|
||||
options.arrayFilters(list);
|
||||
}
|
||||
|
||||
filterQuery.getCollation().map(Collation::toMongoCollation).ifPresent(options::collation);
|
||||
return options;
|
||||
return bulkOperationContext.callback(ReactiveAfterSaveCallback.class, value, mappedDocument, collectionName);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -474,118 +344,47 @@ class DefaultReactiveBulkOperations implements ReactiveBulkOperations {
|
||||
* @author Christoph Strobl
|
||||
* @since 2.0
|
||||
*/
|
||||
static final class ReactiveBulkOperationContext {
|
||||
record ReactiveBulkOperationContext(BulkMode bulkMode, Optional<? extends MongoPersistentEntity<?>> entity,
|
||||
QueryMapper queryMapper, UpdateMapper updateMapper, @Nullable ApplicationEventPublisher eventPublisher,
|
||||
@Nullable ReactiveEntityCallbacks entityCallbacks) {
|
||||
|
||||
private final BulkMode bulkMode;
|
||||
private final Optional<? extends MongoPersistentEntity<?>> entity;
|
||||
private final QueryMapper queryMapper;
|
||||
private final UpdateMapper updateMapper;
|
||||
private final ApplicationEventPublisher eventPublisher;
|
||||
private final ReactiveEntityCallbacks entityCallbacks;
|
||||
|
||||
ReactiveBulkOperationContext(BulkMode bulkMode, Optional<? extends MongoPersistentEntity<?>> entity,
|
||||
QueryMapper queryMapper, UpdateMapper updateMapper, ApplicationEventPublisher eventPublisher,
|
||||
ReactiveEntityCallbacks entityCallbacks) {
|
||||
|
||||
this.bulkMode = bulkMode;
|
||||
this.entity = entity;
|
||||
this.queryMapper = queryMapper;
|
||||
this.updateMapper = updateMapper;
|
||||
this.eventPublisher = eventPublisher;
|
||||
this.entityCallbacks = entityCallbacks;
|
||||
public boolean skipEntityCallbacks() {
|
||||
return entityCallbacks == null;
|
||||
}
|
||||
|
||||
public BulkMode getBulkMode() {
|
||||
return this.bulkMode;
|
||||
public boolean skipEventPublishing() {
|
||||
return eventPublisher == null;
|
||||
}
|
||||
|
||||
public Optional<? extends MongoPersistentEntity<?>> getEntity() {
|
||||
return this.entity;
|
||||
}
|
||||
@SuppressWarnings("rawtypes")
|
||||
public <T> Mono<T> callback(Class<? extends EntityCallback> callbackType, T entity, String collectionName) {
|
||||
|
||||
public QueryMapper getQueryMapper() {
|
||||
return this.queryMapper;
|
||||
}
|
||||
|
||||
public UpdateMapper getUpdateMapper() {
|
||||
return this.updateMapper;
|
||||
}
|
||||
|
||||
public ApplicationEventPublisher getEventPublisher() {
|
||||
return this.eventPublisher;
|
||||
}
|
||||
|
||||
public ReactiveEntityCallbacks getEntityCallbacks() {
|
||||
return this.entityCallbacks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(@Nullable Object o) {
|
||||
if (this == o)
|
||||
return true;
|
||||
if (o == null || getClass() != o.getClass())
|
||||
return false;
|
||||
|
||||
ReactiveBulkOperationContext that = (ReactiveBulkOperationContext) o;
|
||||
|
||||
if (bulkMode != that.bulkMode)
|
||||
return false;
|
||||
if (!ObjectUtils.nullSafeEquals(this.entity, that.entity)) {
|
||||
return false;
|
||||
if (skipEntityCallbacks()) {
|
||||
return Mono.just(entity);
|
||||
}
|
||||
if (!ObjectUtils.nullSafeEquals(this.queryMapper, that.queryMapper)) {
|
||||
return false;
|
||||
}
|
||||
if (!ObjectUtils.nullSafeEquals(this.updateMapper, that.updateMapper)) {
|
||||
return false;
|
||||
}
|
||||
if (!ObjectUtils.nullSafeEquals(this.eventPublisher, that.eventPublisher)) {
|
||||
return false;
|
||||
}
|
||||
return ObjectUtils.nullSafeEquals(this.entityCallbacks, that.entityCallbacks);
|
||||
|
||||
return entityCallbacks.callback(callbackType, entity, collectionName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = bulkMode != null ? bulkMode.hashCode() : 0;
|
||||
result = 31 * result + ObjectUtils.nullSafeHashCode(entity);
|
||||
result = 31 * result + ObjectUtils.nullSafeHashCode(queryMapper);
|
||||
result = 31 * result + ObjectUtils.nullSafeHashCode(updateMapper);
|
||||
result = 31 * result + ObjectUtils.nullSafeHashCode(eventPublisher);
|
||||
result = 31 * result + ObjectUtils.nullSafeHashCode(entityCallbacks);
|
||||
return result;
|
||||
@SuppressWarnings("rawtypes")
|
||||
public <T> Mono<T> callback(Class<? extends EntityCallback> callbackType, T entity, Document document,
|
||||
String collectionName) {
|
||||
|
||||
if (skipEntityCallbacks()) {
|
||||
return Mono.just(entity);
|
||||
}
|
||||
|
||||
return entityCallbacks.callback(callbackType, entity, document, collectionName);
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "DefaultBulkOperations.BulkOperationContext(bulkMode=" + this.getBulkMode() + ", entity="
|
||||
+ this.getEntity() + ", queryMapper=" + this.getQueryMapper() + ", updateMapper=" + this.getUpdateMapper()
|
||||
+ ", eventPublisher=" + this.getEventPublisher() + ", entityCallbacks=" + this.getEntityCallbacks() + ")";
|
||||
public void publishEvent(ApplicationEvent event) {
|
||||
|
||||
if (skipEventPublishing()) {
|
||||
return;
|
||||
}
|
||||
|
||||
eventPublisher.publishEvent(event);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Value object chaining together an actual source with its {@link WriteModel} representation.
|
||||
*
|
||||
* @since 4.1
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
private static final class SourceAwareWriteModelHolder {
|
||||
|
||||
private final Object source;
|
||||
private final WriteModel<Document> model;
|
||||
|
||||
SourceAwareWriteModelHolder(Object source, WriteModel<Document> model) {
|
||||
|
||||
this.source = source;
|
||||
this.model = model;
|
||||
}
|
||||
|
||||
public Object getSource() {
|
||||
return this.source;
|
||||
}
|
||||
|
||||
public WriteModel<Document> getModel() {
|
||||
return this.model;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,21 +15,29 @@
|
||||
*/
|
||||
package org.springframework.data.mongodb.core;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.data.mongodb.core.query.Query;
|
||||
import org.springframework.data.mongodb.core.query.Update;
|
||||
import org.springframework.data.mongodb.core.query.UpdateDefinition;
|
||||
import org.springframework.data.util.Pair;
|
||||
|
||||
import com.mongodb.bulk.BulkWriteResult;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* Bulk operations for insert/update/remove actions on a collection. Bulk operations are available since MongoDB 2.6 and
|
||||
* make use of low level bulk commands on the protocol level. This interface defines a fluent API to add multiple single
|
||||
* operations or list of similar operations in sequence which can then eventually be executed by calling
|
||||
* {@link #execute()}.
|
||||
*
|
||||
* <pre class="code">
|
||||
* ReactiveMongoOperations ops = …;
|
||||
*
|
||||
* ops.bulkOps(BulkMode.UNORDERED, Person.class)
|
||||
* .insert(newPerson)
|
||||
* .updateOne(where("firstname").is("Joe"), Update.update("lastname", "Doe"))
|
||||
* .execute();
|
||||
* </pre>
|
||||
* <p>
|
||||
* Bulk operations are issued as one batch that pulls together all insert, update, and delete operations. Operations
|
||||
* that require individual operation results such as optimistic locking (using {@code @Version}) are not supported and
|
||||
|
||||
@@ -350,6 +350,40 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations {
|
||||
*/
|
||||
Mono<Void> dropCollection(String collectionName);
|
||||
|
||||
/**
|
||||
* Returns a new {@link ReactiveBulkOperations} for the given collection. <br />
|
||||
* <strong>NOTE:</strong> Any additional support for field mapping, etc. is not available for {@literal update} or
|
||||
* {@literal remove} operations in bulk mode due to the lack of domain type information. Use
|
||||
* {@link #bulkOps(BulkMode, Class, String)} to get full type specific support.
|
||||
*
|
||||
* @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}.
|
||||
* @param collectionName the name of the collection to work on, must not be {@literal null} or empty.
|
||||
* @return {@link ReactiveBulkOperations} on the named collection
|
||||
* @since 4.1
|
||||
*/
|
||||
ReactiveBulkOperations bulkOps(BulkMode mode, String collectionName);
|
||||
|
||||
/**
|
||||
* Returns a new {@link ReactiveBulkOperations} for the given entity type.
|
||||
*
|
||||
* @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}.
|
||||
* @param entityClass the name of the entity class, must not be {@literal null}.
|
||||
* @return {@link ReactiveBulkOperations} on the named collection associated of the given entity class.
|
||||
* @since 4.1
|
||||
*/
|
||||
ReactiveBulkOperations bulkOps(BulkMode mode, Class<?> entityClass);
|
||||
|
||||
/**
|
||||
* Returns a new {@link ReactiveBulkOperations} for the given entity type and collection name.
|
||||
*
|
||||
* @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}.
|
||||
* @param entityType the name of the entity class. Can be {@literal null}.
|
||||
* @param collectionName the name of the collection to work on, must not be {@literal null} or empty.
|
||||
* @return {@link ReactiveBulkOperations} on the named collection associated with the given entity class.
|
||||
* @since 4.1
|
||||
*/
|
||||
ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class<?> entityType, String collectionName);
|
||||
|
||||
/**
|
||||
* Query for a {@link Flux} of objects of type T from the collection used by the entity class. <br />
|
||||
* The object is converted from the MongoDB native representation using an instance of {@see MongoConverter}. Unless
|
||||
@@ -1748,39 +1782,6 @@ public interface ReactiveMongoOperations extends ReactiveFluentMongoOperations {
|
||||
<T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, String inputCollectionName, Class<T> resultType,
|
||||
String mapFunction, String reduceFunction, MapReduceOptions options);
|
||||
|
||||
/**
|
||||
* Returns a new {@link ReactiveBulkOperations} for the given collection. <br />
|
||||
* <strong>NOTE:</strong> Any additional support for field mapping, etc. is not available for {@literal update} or
|
||||
* {@literal remove} operations in bulk mode due to the lack of domain type information. Use
|
||||
* {@link #bulkOps(BulkMode, Class, String)} to get full type specific support.
|
||||
*
|
||||
* @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}.
|
||||
* @param collectionName the name of the collection to work on, must not be {@literal null} or empty.
|
||||
* @return {@link ReactiveBulkOperations} on the named collection
|
||||
* @since 4.1
|
||||
*/
|
||||
ReactiveBulkOperations bulkOps(BulkMode mode, String collectionName);
|
||||
|
||||
/**
|
||||
* Returns a new {@link ReactiveBulkOperations} for the given entity type.
|
||||
*
|
||||
* @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}.
|
||||
* @param entityClass the name of the entity class, must not be {@literal null}.
|
||||
* @return {@link ReactiveBulkOperations} on the named collection associated of the given entity class.
|
||||
* @since 4.1
|
||||
*/
|
||||
ReactiveBulkOperations bulkOps(BulkMode mode, Class<?> entityClass);
|
||||
|
||||
/**
|
||||
* Returns a new {@link ReactiveBulkOperations} for the given entity type and collection name.
|
||||
*
|
||||
* @param mode the {@link BulkMode} to use for bulk operations, must not be {@literal null}.
|
||||
* @param entityType the name of the entity class. Can be {@literal null}.
|
||||
* @param collectionName the name of the collection to work on, must not be {@literal null} or empty.
|
||||
* @return {@link ReactiveBulkOperations} on the named collection associated with the given entity class.
|
||||
* @since 4.1
|
||||
*/
|
||||
ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class<?> entityType, String collectionName);
|
||||
|
||||
/**
|
||||
* Returns the underlying {@link MongoConverter}.
|
||||
|
||||
@@ -17,8 +17,6 @@ package org.springframework.data.mongodb.core;
|
||||
|
||||
import static org.springframework.data.mongodb.core.query.SerializationUtils.*;
|
||||
|
||||
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
|
||||
import org.springframework.data.mongodb.core.DefaultReactiveBulkOperations.ReactiveBulkOperationContext;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuple2;
|
||||
@@ -74,7 +72,9 @@ import org.springframework.data.mongodb.MongoDatabaseFactory;
|
||||
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
|
||||
import org.springframework.data.mongodb.ReactiveMongoDatabaseUtils;
|
||||
import org.springframework.data.mongodb.SessionSynchronization;
|
||||
import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
|
||||
import org.springframework.data.mongodb.core.CollectionPreparerSupport.ReactiveCollectionPreparerDelegate;
|
||||
import org.springframework.data.mongodb.core.DefaultReactiveBulkOperations.ReactiveBulkOperationContext;
|
||||
import org.springframework.data.mongodb.core.EntityOperations.AdaptibleEntity;
|
||||
import org.springframework.data.mongodb.core.QueryOperations.AggregationDefinition;
|
||||
import org.springframework.data.mongodb.core.QueryOperations.CountContext;
|
||||
@@ -481,31 +481,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
return new DefaultReactiveIndexOperations(this, getCollectionName(entityClass), this.queryMapper, entityClass);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReactiveBulkOperations bulkOps(BulkMode bulkMode, String collectionName) {
|
||||
return bulkOps(bulkMode, null, collectionName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReactiveBulkOperations bulkOps(BulkMode bulkMode, Class<?> entityClass) {
|
||||
return bulkOps(bulkMode, entityClass, getCollectionName(entityClass));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class<?> entityType, String collectionName) {
|
||||
|
||||
Assert.notNull(mode, "BulkMode must not be null");
|
||||
Assert.hasText(collectionName, "Collection name must not be null or empty");
|
||||
|
||||
DefaultReactiveBulkOperations operations = new DefaultReactiveBulkOperations(this, collectionName,
|
||||
new ReactiveBulkOperationContext(mode, Optional.ofNullable(getPersistentEntity(entityType)), queryMapper, updateMapper,
|
||||
eventPublisher, entityCallbacks));
|
||||
|
||||
operations.setDefaultWriteConcern(writeConcern);
|
||||
|
||||
return operations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCollectionName(Class<?> entityClass) {
|
||||
return operations.determineCollectionName(entityClass);
|
||||
@@ -763,7 +738,6 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
public <T> Mono<Void> dropCollection(Class<T> entityClass) {
|
||||
return dropCollection(getCollectionName(entityClass));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> dropCollection(String collectionName) {
|
||||
|
||||
@@ -774,6 +748,31 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
|
||||
}).then();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReactiveBulkOperations bulkOps(BulkMode mode, String collectionName) {
|
||||
return bulkOps(mode, null, collectionName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReactiveBulkOperations bulkOps(BulkMode mode, Class<?> entityClass) {
|
||||
return bulkOps(mode, entityClass, getCollectionName(entityClass));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReactiveBulkOperations bulkOps(BulkMode mode, @Nullable Class<?> entityType, String collectionName) {
|
||||
|
||||
Assert.notNull(mode, "BulkMode must not be null");
|
||||
Assert.hasText(collectionName, "Collection name must not be null or empty");
|
||||
|
||||
DefaultReactiveBulkOperations operations = new DefaultReactiveBulkOperations(this, collectionName,
|
||||
new ReactiveBulkOperationContext(mode, Optional.ofNullable(getPersistentEntity(entityType)), queryMapper,
|
||||
updateMapper, eventPublisher, entityCallbacks));
|
||||
|
||||
operations.setDefaultWriteConcern(writeConcern);
|
||||
|
||||
return operations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<String> getCollectionNames() {
|
||||
return createFlux(MongoDatabase::listCollectionNames);
|
||||
|
||||
@@ -51,6 +51,8 @@ import com.mongodb.WriteConcern;
|
||||
import com.mongodb.bulk.BulkWriteResult;
|
||||
|
||||
/**
|
||||
* Tests for {@link DefaultReactiveBulkOperations}.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
*/
|
||||
@ExtendWith(MongoTemplateExtension.class)
|
||||
@@ -74,7 +76,7 @@ class DefaultReactiveBulkOperationsTests {
|
||||
.execute().as(StepVerifier::create) //
|
||||
.consumeNextWith(result -> {
|
||||
assertThat(result.getInsertedCount()).isEqualTo(2);
|
||||
});
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
@Test // GH-2821
|
||||
@@ -98,7 +100,7 @@ class DefaultReactiveBulkOperationsTests {
|
||||
.execute().as(StepVerifier::create) //
|
||||
.consumeNextWith(result -> {
|
||||
assertThat(result.getInsertedCount()).isEqualTo(2);
|
||||
});
|
||||
}).verifyComplete();
|
||||
}
|
||||
|
||||
@Test // GH-2821
|
||||
|
||||
Reference in New Issue
Block a user