Refine locking.
This commit is contained in:
@@ -18,8 +18,12 @@ package org.springframework.data.mongodb.core;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.function.UnaryOperator;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.bson.Document;
|
||||
@@ -187,16 +191,19 @@ public interface MongoOperations extends FluentMongoOperations {
|
||||
|
||||
return new SessionScoped() {
|
||||
|
||||
private final Object lock = new Object();
|
||||
private @Nullable ClientSession session = null;
|
||||
private final Lock lock = new ReentrantLock();
|
||||
private @Nullable ClientSession session;
|
||||
|
||||
@Override
|
||||
public <T> T execute(SessionCallback<T> action, Consumer<ClientSession> onComplete) {
|
||||
|
||||
synchronized (lock) {
|
||||
lock.lock();
|
||||
try {
|
||||
if (session == null) {
|
||||
session = sessionProvider.get();
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
@@ -22,6 +22,10 @@ import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.aopalliance.intercept.MethodInterceptor;
|
||||
@@ -171,6 +175,8 @@ public final class LazyLoadingProxyFactory {
|
||||
}
|
||||
}
|
||||
|
||||
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
|
||||
private final MongoPersistentProperty property;
|
||||
private final DbRefResolverCallback callback;
|
||||
private final Object source;
|
||||
@@ -339,8 +345,9 @@ public final class LazyLoadingProxyFactory {
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private synchronized Object resolve() {
|
||||
private Object resolve() {
|
||||
|
||||
lock.readLock().lock();
|
||||
if (resolved) {
|
||||
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
@@ -349,6 +356,7 @@ public final class LazyLoadingProxyFactory {
|
||||
}
|
||||
return result;
|
||||
}
|
||||
lock.readLock().unlock();
|
||||
|
||||
try {
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
@@ -356,6 +364,7 @@ public final class LazyLoadingProxyFactory {
|
||||
property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName()));
|
||||
}
|
||||
|
||||
lock.writeLock().lock();
|
||||
return callback.resolve(property);
|
||||
|
||||
} catch (RuntimeException ex) {
|
||||
@@ -368,6 +377,8 @@ public final class LazyLoadingProxyFactory {
|
||||
|
||||
throw new LazyLoadingException("Unable to lazily resolve DBRef",
|
||||
translatedException != null ? translatedException : ex);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ package org.springframework.data.mongodb.core.messaging;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.springframework.dao.DataAccessResourceFailureException;
|
||||
@@ -39,7 +41,7 @@ import com.mongodb.client.MongoCursor;
|
||||
*/
|
||||
abstract class CursorReadingTask<T, R> implements Task {
|
||||
|
||||
private final Object lifecycleMonitor = new Object();
|
||||
private final Lock lock = new ReentrantLock();
|
||||
|
||||
private final MongoTemplate template;
|
||||
private final SubscriptionRequest<T, R, RequestOptions> request;
|
||||
@@ -86,18 +88,18 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
state = State.CANCELLED;
|
||||
}
|
||||
lock.lock();
|
||||
state = State.CANCELLED;
|
||||
lock.unlock();
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
state = State.CANCELLED;
|
||||
}
|
||||
lock.lock();
|
||||
state = State.CANCELLED;
|
||||
lock.unlock();
|
||||
|
||||
errorHandler.handleError(e);
|
||||
}
|
||||
@@ -114,18 +116,19 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
*/
|
||||
private void start() {
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
if (!State.RUNNING.equals(state)) {
|
||||
state = State.STARTING;
|
||||
}
|
||||
lock.lock();
|
||||
if (!State.RUNNING.equals(state)) {
|
||||
state = State.STARTING;
|
||||
}
|
||||
lock.unlock();
|
||||
|
||||
do {
|
||||
|
||||
boolean valid = false;
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
lock.lock();
|
||||
|
||||
try {
|
||||
if (State.STARTING.equals(state)) {
|
||||
|
||||
MongoCursor<T> cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType));
|
||||
@@ -137,6 +140,8 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
cursor.close();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
if (!valid) {
|
||||
@@ -145,9 +150,9 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
state = State.CANCELLED;
|
||||
}
|
||||
lock.lock();
|
||||
state = State.CANCELLED;
|
||||
lock.unlock();
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
@@ -163,7 +168,9 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
@Override
|
||||
public void cancel() throws DataAccessResourceFailureException {
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
lock.lock();
|
||||
|
||||
try {
|
||||
|
||||
if (State.RUNNING.equals(state) || State.STARTING.equals(state)) {
|
||||
this.state = State.CANCELLED;
|
||||
@@ -171,6 +178,8 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
cursor.close();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,8 +191,11 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
@Override
|
||||
public State getState() {
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
lock.lock();
|
||||
try {
|
||||
return state;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,10 +232,13 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
@Nullable
|
||||
private T getNext() {
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
lock.lock();
|
||||
try{
|
||||
if (State.RUNNING.equals(state)) {
|
||||
return cursor.tryNext();
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor));
|
||||
|
||||
@@ -20,6 +20,8 @@ import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@@ -49,9 +51,11 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
|
||||
private final TaskFactory taskFactory;
|
||||
private final Optional<ErrorHandler> errorHandler;
|
||||
|
||||
private final Object lifecycleMonitor = new Object();
|
||||
private final Map<SubscriptionRequest, Subscription> subscriptions = new LinkedHashMap<>();
|
||||
|
||||
ReadWriteLock lifecycleMonitor = new ReentrantReadWriteLock();
|
||||
ReadWriteLock subscriptionMonitor = new ReentrantReadWriteLock();
|
||||
|
||||
private boolean running = false;
|
||||
|
||||
/**
|
||||
@@ -109,42 +113,47 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
lifecycleMonitor.writeLock().lock();
|
||||
try {
|
||||
if (!this.running) {
|
||||
subscriptions.values().stream() //
|
||||
.filter(it -> !it.isActive()) //
|
||||
.filter(TaskSubscription.class::isInstance) //
|
||||
.map(TaskSubscription.class::cast) //
|
||||
.map(TaskSubscription::getTask) //
|
||||
.forEach(taskExecutor::execute);
|
||||
|
||||
if (this.running) {
|
||||
return;
|
||||
running = true;
|
||||
}
|
||||
|
||||
subscriptions.values().stream() //
|
||||
.filter(it -> !it.isActive()) //
|
||||
.filter(TaskSubscription.class::isInstance) //
|
||||
.map(TaskSubscription.class::cast) //
|
||||
.map(TaskSubscription::getTask) //
|
||||
.forEach(taskExecutor::execute);
|
||||
|
||||
running = true;
|
||||
} finally {
|
||||
lifecycleMonitor.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
lifecycleMonitor.writeLock().lock();
|
||||
|
||||
try {
|
||||
if (this.running) {
|
||||
|
||||
subscriptions.values().forEach(Cancelable::cancel);
|
||||
|
||||
running = false;
|
||||
}
|
||||
} finally {
|
||||
lifecycleMonitor.writeLock().unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
lifecycleMonitor.writeLock().lock();
|
||||
try {
|
||||
return running;
|
||||
} finally {
|
||||
lifecycleMonitor.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,35 +180,42 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
|
||||
@Override
|
||||
public Optional<Subscription> lookup(SubscriptionRequest<?, ?, ?> request) {
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
subscriptionMonitor.readLock();
|
||||
try {
|
||||
return Optional.ofNullable(subscriptions.get(request));
|
||||
} finally {
|
||||
subscriptionMonitor.readLock().unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Subscription register(SubscriptionRequest request, Task task) {
|
||||
|
||||
Subscription subscription = new TaskSubscription(task);
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
|
||||
this.subscriptionMonitor.writeLock().lock();
|
||||
try {
|
||||
if (subscriptions.containsKey(request)) {
|
||||
return subscriptions.get(request);
|
||||
}
|
||||
|
||||
this.subscriptions.put(request, subscription);
|
||||
|
||||
if (this.running) {
|
||||
if (this.isRunning()) {
|
||||
taskExecutor.execute(task);
|
||||
}
|
||||
return subscription;
|
||||
} finally {
|
||||
this.subscriptionMonitor.writeLock().unlock();
|
||||
}
|
||||
|
||||
return subscription;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(Subscription subscription) {
|
||||
|
||||
synchronized (lifecycleMonitor) {
|
||||
this.subscriptionMonitor.writeLock().lock();
|
||||
try {
|
||||
|
||||
if (subscriptions.containsValue(subscription)) {
|
||||
|
||||
@@ -209,6 +225,8 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
|
||||
|
||||
subscriptions.values().remove(subscription);
|
||||
}
|
||||
} finally {
|
||||
this.subscriptionMonitor.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user