diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java index 2f3c0dd92..0025a1d7e 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java @@ -18,8 +18,12 @@ package org.springframework.data.mongodb.core; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.stream.Stream; import org.bson.Document; @@ -187,16 +191,19 @@ public interface MongoOperations extends FluentMongoOperations { return new SessionScoped() { - private final Object lock = new Object(); - private @Nullable ClientSession session = null; + private final Lock lock = new ReentrantLock(); + private @Nullable ClientSession session; @Override public T execute(SessionCallback action, Consumer onComplete) { - synchronized (lock) { + lock.lock(); + try { if (session == null) { session = sessionProvider.get(); } + } finally { + lock.unlock(); } try { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java index 0c14b2797..bec7ab3b7 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java @@ -22,6 +22,10 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; import java.lang.reflect.Method; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import org.aopalliance.intercept.MethodInterceptor; @@ -171,6 +175,8 @@ public final class LazyLoadingProxyFactory { } } + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final MongoPersistentProperty property; private final DbRefResolverCallback callback; private final Object source; @@ -339,8 +345,9 @@ public final class LazyLoadingProxyFactory { } @Nullable - private synchronized Object resolve() { + private Object resolve() { + lock.readLock().lock(); if (resolved) { if (LOGGER.isTraceEnabled()) { @@ -349,6 +356,7 @@ public final class LazyLoadingProxyFactory { } return result; } + lock.readLock().unlock(); try { if (LOGGER.isTraceEnabled()) { @@ -356,6 +364,7 @@ public final class LazyLoadingProxyFactory { property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); } + lock.writeLock().lock(); return callback.resolve(property); } catch (RuntimeException ex) { @@ -368,6 +377,8 @@ public final class LazyLoadingProxyFactory { throw new LazyLoadingException("Unable to lazily resolve DBRef", translatedException != null ? translatedException : ex); + } finally { + lock.writeLock().unlock(); } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java index cc0c64eea..1d6d81be6 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java @@ -18,6 +18,8 @@ package org.springframework.data.mongodb.core.messaging; import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.springframework.dao.DataAccessResourceFailureException; @@ -39,7 +41,7 @@ import com.mongodb.client.MongoCursor; */ abstract class CursorReadingTask implements Task { - private final Object lifecycleMonitor = new Object(); + private final Lock lock = new ReentrantLock(); private final MongoTemplate template; private final SubscriptionRequest request; @@ -86,18 +88,18 @@ abstract class CursorReadingTask implements Task { } } catch (InterruptedException e) { - synchronized (lifecycleMonitor) { - state = State.CANCELLED; - } + lock.lock(); + state = State.CANCELLED; + lock.unlock(); Thread.currentThread().interrupt(); break; } } } catch (RuntimeException e) { - synchronized (lifecycleMonitor) { - state = State.CANCELLED; - } + lock.lock(); + state = State.CANCELLED; + lock.unlock(); errorHandler.handleError(e); } @@ -114,18 +116,19 @@ abstract class CursorReadingTask implements Task { */ private void start() { - synchronized (lifecycleMonitor) { - if (!State.RUNNING.equals(state)) { - state = State.STARTING; - } + lock.lock(); + if (!State.RUNNING.equals(state)) { + state = State.STARTING; } + lock.unlock(); do { boolean valid = false; - synchronized (lifecycleMonitor) { + lock.lock(); + try { if (State.STARTING.equals(state)) { MongoCursor cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType)); @@ -137,6 +140,8 @@ abstract class CursorReadingTask implements Task { cursor.close(); } } + } finally { + lock.unlock(); } if (!valid) { @@ -145,9 +150,9 @@ abstract class CursorReadingTask implements Task { Thread.sleep(100); } catch (InterruptedException e) { - synchronized (lifecycleMonitor) { - state = State.CANCELLED; - } + lock.lock(); + state = State.CANCELLED; + lock.unlock(); Thread.currentThread().interrupt(); } } @@ -163,7 +168,9 @@ abstract class CursorReadingTask implements Task { @Override public void cancel() throws DataAccessResourceFailureException { - synchronized (lifecycleMonitor) { + lock.lock(); + + try { if (State.RUNNING.equals(state) || State.STARTING.equals(state)) { this.state = State.CANCELLED; @@ -171,6 +178,8 @@ abstract class CursorReadingTask implements Task { cursor.close(); } } + } finally { + lock.unlock(); } } @@ -182,8 +191,11 @@ abstract class CursorReadingTask implements Task { @Override public State getState() { - synchronized (lifecycleMonitor) { + lock.lock(); + try { return state; + } finally { + lock.unlock(); } } @@ -220,10 +232,13 @@ abstract class CursorReadingTask implements Task { @Nullable private T getNext() { - synchronized (lifecycleMonitor) { + lock.lock(); + try{ if (State.RUNNING.equals(state)) { return cursor.tryNext(); } + } finally { + lock.unlock(); } throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor)); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java index 7eb088c49..a38a02b18 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java @@ -20,6 +20,8 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.Executor; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,9 +51,11 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer private final TaskFactory taskFactory; private final Optional errorHandler; - private final Object lifecycleMonitor = new Object(); private final Map subscriptions = new LinkedHashMap<>(); + ReadWriteLock lifecycleMonitor = new ReentrantReadWriteLock(); + ReadWriteLock subscriptionMonitor = new ReentrantReadWriteLock(); + private boolean running = false; /** @@ -109,42 +113,47 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @Override public void start() { - synchronized (lifecycleMonitor) { + lifecycleMonitor.writeLock().lock(); + try { + if (!this.running) { + subscriptions.values().stream() // + .filter(it -> !it.isActive()) // + .filter(TaskSubscription.class::isInstance) // + .map(TaskSubscription.class::cast) // + .map(TaskSubscription::getTask) // + .forEach(taskExecutor::execute); - if (this.running) { - return; + running = true; } - - subscriptions.values().stream() // - .filter(it -> !it.isActive()) // - .filter(TaskSubscription.class::isInstance) // - .map(TaskSubscription.class::cast) // - .map(TaskSubscription::getTask) // - .forEach(taskExecutor::execute); - - running = true; + } finally { + lifecycleMonitor.writeLock().unlock(); } } @Override public void stop() { - synchronized (lifecycleMonitor) { + lifecycleMonitor.writeLock().lock(); + try { if (this.running) { - subscriptions.values().forEach(Cancelable::cancel); - running = false; } + } finally { + lifecycleMonitor.writeLock().unlock(); } + } @Override public boolean isRunning() { - synchronized (this.lifecycleMonitor) { + lifecycleMonitor.writeLock().lock(); + try { return running; + } finally { + lifecycleMonitor.writeLock().unlock(); } } @@ -171,35 +180,42 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @Override public Optional lookup(SubscriptionRequest request) { - synchronized (lifecycleMonitor) { + subscriptionMonitor.readLock(); + try { return Optional.ofNullable(subscriptions.get(request)); + } finally { + subscriptionMonitor.readLock().unlock(); } + } public Subscription register(SubscriptionRequest request, Task task) { Subscription subscription = new TaskSubscription(task); - synchronized (lifecycleMonitor) { - + this.subscriptionMonitor.writeLock().lock(); + try { if (subscriptions.containsKey(request)) { return subscriptions.get(request); } this.subscriptions.put(request, subscription); - if (this.running) { + if (this.isRunning()) { taskExecutor.execute(task); } + return subscription; + } finally { + this.subscriptionMonitor.writeLock().unlock(); } - return subscription; } @Override public void remove(Subscription subscription) { - synchronized (lifecycleMonitor) { + this.subscriptionMonitor.writeLock().lock(); + try { if (subscriptions.containsValue(subscription)) { @@ -209,6 +225,8 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer subscriptions.values().remove(subscription); } + } finally { + this.subscriptionMonitor.writeLock().unlock(); } }