Compare commits
3 Commits
main
...
issue/4429
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db4a1be5cb | ||
|
|
2e336a12e9 | ||
|
|
3e517e9c84 |
2
pom.xml
2
pom.xml
@@ -5,7 +5,7 @@
|
|||||||
|
|
||||||
<groupId>org.springframework.data</groupId>
|
<groupId>org.springframework.data</groupId>
|
||||||
<artifactId>spring-data-mongodb-parent</artifactId>
|
<artifactId>spring-data-mongodb-parent</artifactId>
|
||||||
<version>4.2.0-SNAPSHOT</version>
|
<version>4.2.x-4429-SNAPSHOT</version>
|
||||||
<packaging>pom</packaging>
|
<packaging>pom</packaging>
|
||||||
|
|
||||||
<name>Spring Data MongoDB</name>
|
<name>Spring Data MongoDB</name>
|
||||||
|
|||||||
@@ -7,7 +7,7 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<groupId>org.springframework.data</groupId>
|
<groupId>org.springframework.data</groupId>
|
||||||
<artifactId>spring-data-mongodb-parent</artifactId>
|
<artifactId>spring-data-mongodb-parent</artifactId>
|
||||||
<version>4.2.0-SNAPSHOT</version>
|
<version>4.2.x-4429-SNAPSHOT</version>
|
||||||
<relativePath>../pom.xml</relativePath>
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,7 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<groupId>org.springframework.data</groupId>
|
<groupId>org.springframework.data</groupId>
|
||||||
<artifactId>spring-data-mongodb-parent</artifactId>
|
<artifactId>spring-data-mongodb-parent</artifactId>
|
||||||
<version>4.2.0-SNAPSHOT</version>
|
<version>4.2.x-4429-SNAPSHOT</version>
|
||||||
<relativePath>../pom.xml</relativePath>
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,7 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<groupId>org.springframework.data</groupId>
|
<groupId>org.springframework.data</groupId>
|
||||||
<artifactId>spring-data-mongodb-parent</artifactId>
|
<artifactId>spring-data-mongodb-parent</artifactId>
|
||||||
<version>4.2.0-SNAPSHOT</version>
|
<version>4.2.x-4429-SNAPSHOT</version>
|
||||||
<relativePath>../pom.xml</relativePath>
|
<relativePath>../pom.xml</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
|
|||||||
@@ -18,8 +18,12 @@ package org.springframework.data.mongodb.core;
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
import java.util.function.UnaryOperator;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.bson.Document;
|
import org.bson.Document;
|
||||||
@@ -187,16 +191,19 @@ public interface MongoOperations extends FluentMongoOperations {
|
|||||||
|
|
||||||
return new SessionScoped() {
|
return new SessionScoped() {
|
||||||
|
|
||||||
private final Object lock = new Object();
|
private final Lock lock = new ReentrantLock();
|
||||||
private @Nullable ClientSession session = null;
|
private @Nullable ClientSession session;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> T execute(SessionCallback<T> action, Consumer<ClientSession> onComplete) {
|
public <T> T execute(SessionCallback<T> action, Consumer<ClientSession> onComplete) {
|
||||||
|
|
||||||
synchronized (lock) {
|
lock.lock();
|
||||||
|
try {
|
||||||
if (session == null) {
|
if (session == null) {
|
||||||
session = sessionProvider.get();
|
session = sessionProvider.get();
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -22,6 +22,9 @@ import java.io.ObjectInputStream;
|
|||||||
import java.io.ObjectOutputStream;
|
import java.io.ObjectOutputStream;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.aopalliance.intercept.MethodInterceptor;
|
import org.aopalliance.intercept.MethodInterceptor;
|
||||||
@@ -134,7 +137,8 @@ public final class LazyLoadingProxyFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return prepareProxyFactory(propertyType,
|
return prepareProxyFactory(propertyType,
|
||||||
() -> new LazyLoadingInterceptor(property, callback, source, exceptionTranslator)).getProxy(LazyLoadingProxy.class.getClassLoader());
|
() -> new LazyLoadingInterceptor(property, callback, source, exceptionTranslator))
|
||||||
|
.getProxy(LazyLoadingProxy.class.getClassLoader());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -171,6 +175,8 @@ public final class LazyLoadingProxyFactory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
private final MongoPersistentProperty property;
|
private final MongoPersistentProperty property;
|
||||||
private final DbRefResolverCallback callback;
|
private final DbRefResolverCallback callback;
|
||||||
private final Object source;
|
private final Object source;
|
||||||
@@ -339,25 +345,29 @@ public final class LazyLoadingProxyFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private synchronized Object resolve() {
|
private Object resolve() {
|
||||||
|
|
||||||
if (resolved) {
|
lock.readLock().lock();
|
||||||
|
try {
|
||||||
|
if (resolved) {
|
||||||
|
|
||||||
if (LOGGER.isTraceEnabled()) {
|
if (LOGGER.isTraceEnabled()) {
|
||||||
LOGGER.trace(String.format("Accessing already resolved lazy loading property %s.%s",
|
LOGGER.trace(String.format("Accessing already resolved lazy loading property %s.%s",
|
||||||
property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName()));
|
property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName()));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
return result;
|
} finally {
|
||||||
|
lock.readLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOGGER.isTraceEnabled()) {
|
||||||
|
LOGGER.trace(String.format("Resolving lazy loading property %s.%s",
|
||||||
|
property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (LOGGER.isTraceEnabled()) {
|
return executeWhileLocked(lock.writeLock(), () -> callback.resolve(property));
|
||||||
LOGGER.trace(String.format("Resolving lazy loading property %s.%s",
|
|
||||||
property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName()));
|
|
||||||
}
|
|
||||||
|
|
||||||
return callback.resolve(property);
|
|
||||||
|
|
||||||
} catch (RuntimeException ex) {
|
} catch (RuntimeException ex) {
|
||||||
|
|
||||||
DataAccessException translatedException = exceptionTranslator.translateExceptionIfPossible(ex);
|
DataAccessException translatedException = exceptionTranslator.translateExceptionIfPossible(ex);
|
||||||
@@ -370,6 +380,16 @@ public final class LazyLoadingProxyFactory {
|
|||||||
translatedException != null ? translatedException : ex);
|
translatedException != null ? translatedException : ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static <T> T executeWhileLocked(Lock lock, Supplier<T> stuff) {
|
||||||
|
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
return stuff.get();
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,8 @@ package org.springframework.data.mongodb.core.messaging;
|
|||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.springframework.dao.DataAccessResourceFailureException;
|
import org.springframework.dao.DataAccessResourceFailureException;
|
||||||
@@ -39,7 +41,7 @@ import com.mongodb.client.MongoCursor;
|
|||||||
*/
|
*/
|
||||||
abstract class CursorReadingTask<T, R> implements Task {
|
abstract class CursorReadingTask<T, R> implements Task {
|
||||||
|
|
||||||
private final Object lifecycleMonitor = new Object();
|
private final Lock lock = new ReentrantLock();
|
||||||
|
|
||||||
private final MongoTemplate template;
|
private final MongoTemplate template;
|
||||||
private final SubscriptionRequest<T, R, RequestOptions> request;
|
private final SubscriptionRequest<T, R, RequestOptions> request;
|
||||||
@@ -86,19 +88,14 @@ abstract class CursorReadingTask<T, R> implements Task {
|
|||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
||||||
synchronized (lifecycleMonitor) {
|
doWhileLocked(lock, () -> state = State.CANCELLED);
|
||||||
state = State.CANCELLED;
|
|
||||||
}
|
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
|
|
||||||
synchronized (lifecycleMonitor) {
|
doWhileLocked(lock, () -> state = State.CANCELLED);
|
||||||
state = State.CANCELLED;
|
|
||||||
}
|
|
||||||
|
|
||||||
errorHandler.handleError(e);
|
errorHandler.handleError(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -114,30 +111,32 @@ abstract class CursorReadingTask<T, R> implements Task {
|
|||||||
*/
|
*/
|
||||||
private void start() {
|
private void start() {
|
||||||
|
|
||||||
synchronized (lifecycleMonitor) {
|
doWhileLocked(lock, () -> {
|
||||||
if (!State.RUNNING.equals(state)) {
|
if (!State.RUNNING.equals(state)) {
|
||||||
state = State.STARTING;
|
state = State.STARTING;
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
|
||||||
boolean valid = false;
|
// boolean valid = false;
|
||||||
|
|
||||||
synchronized (lifecycleMonitor) {
|
boolean valid = executeWhileLocked(lock, () -> {
|
||||||
|
|
||||||
if (State.STARTING.equals(state)) {
|
if (!State.STARTING.equals(state)) {
|
||||||
|
return false;
|
||||||
MongoCursor<T> cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType));
|
|
||||||
valid = isValidCursor(cursor);
|
|
||||||
if (valid) {
|
|
||||||
this.cursor = cursor;
|
|
||||||
state = State.RUNNING;
|
|
||||||
} else if (cursor != null) {
|
|
||||||
cursor.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
MongoCursor<T> cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType));
|
||||||
|
boolean isValid = isValidCursor(cursor);
|
||||||
|
if (isValid) {
|
||||||
|
this.cursor = cursor;
|
||||||
|
state = State.RUNNING;
|
||||||
|
} else if (cursor != null) {
|
||||||
|
cursor.close();
|
||||||
|
}
|
||||||
|
return isValid;
|
||||||
|
});
|
||||||
|
|
||||||
if (!valid) {
|
if (!valid) {
|
||||||
|
|
||||||
@@ -145,9 +144,7 @@ abstract class CursorReadingTask<T, R> implements Task {
|
|||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
||||||
synchronized (lifecycleMonitor) {
|
doWhileLocked(lock, () -> state = State.CANCELLED);
|
||||||
state = State.CANCELLED;
|
|
||||||
}
|
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -163,7 +160,7 @@ abstract class CursorReadingTask<T, R> implements Task {
|
|||||||
@Override
|
@Override
|
||||||
public void cancel() throws DataAccessResourceFailureException {
|
public void cancel() throws DataAccessResourceFailureException {
|
||||||
|
|
||||||
synchronized (lifecycleMonitor) {
|
doWhileLocked(lock, () -> {
|
||||||
|
|
||||||
if (State.RUNNING.equals(state) || State.STARTING.equals(state)) {
|
if (State.RUNNING.equals(state) || State.STARTING.equals(state)) {
|
||||||
this.state = State.CANCELLED;
|
this.state = State.CANCELLED;
|
||||||
@@ -171,7 +168,7 @@ abstract class CursorReadingTask<T, R> implements Task {
|
|||||||
cursor.close();
|
cursor.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -181,10 +178,7 @@ abstract class CursorReadingTask<T, R> implements Task {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public State getState() {
|
public State getState() {
|
||||||
|
return executeWhileLocked(lock, () -> state);
|
||||||
synchronized (lifecycleMonitor) {
|
|
||||||
return state;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -220,13 +214,12 @@ abstract class CursorReadingTask<T, R> implements Task {
|
|||||||
@Nullable
|
@Nullable
|
||||||
private T getNext() {
|
private T getNext() {
|
||||||
|
|
||||||
synchronized (lifecycleMonitor) {
|
return executeWhileLocked(lock, () -> {
|
||||||
if (State.RUNNING.equals(state)) {
|
if (State.RUNNING.equals(state)) {
|
||||||
return cursor.tryNext();
|
return cursor.tryNext();
|
||||||
}
|
}
|
||||||
}
|
throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor));
|
||||||
|
});
|
||||||
throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isValidCursor(@Nullable MongoCursor<?> cursor) {
|
private static boolean isValidCursor(@Nullable MongoCursor<?> cursor) {
|
||||||
@@ -263,4 +256,23 @@ abstract class CursorReadingTask<T, R> implements Task {
|
|||||||
throw translated != null ? translated : e;
|
throw translated != null ? translated : e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void doWhileLocked(Lock lock, Runnable action) {
|
||||||
|
|
||||||
|
executeWhileLocked(lock, () -> {
|
||||||
|
action.run();
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private static <T> T executeWhileLocked(Lock lock, Supplier<T> stuff) {
|
||||||
|
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
return stuff.get();
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,6 +20,10 @@ import java.util.LinkedHashMap;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@@ -35,8 +39,7 @@ import org.springframework.util.ObjectUtils;
|
|||||||
/**
|
/**
|
||||||
* Simple {@link Executor} based {@link MessageListenerContainer} implementation for running {@link Task tasks} like
|
* Simple {@link Executor} based {@link MessageListenerContainer} implementation for running {@link Task tasks} like
|
||||||
* listening to MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> and tailable
|
* listening to MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> and tailable
|
||||||
* cursors.
|
* cursors. <br />
|
||||||
* <br />
|
|
||||||
* This message container creates long-running tasks that are executed on {@link Executor}.
|
* This message container creates long-running tasks that are executed on {@link Executor}.
|
||||||
*
|
*
|
||||||
* @author Christoph Strobl
|
* @author Christoph Strobl
|
||||||
@@ -49,9 +52,11 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
|
|||||||
private final TaskFactory taskFactory;
|
private final TaskFactory taskFactory;
|
||||||
private final Optional<ErrorHandler> errorHandler;
|
private final Optional<ErrorHandler> errorHandler;
|
||||||
|
|
||||||
private final Object lifecycleMonitor = new Object();
|
|
||||||
private final Map<SubscriptionRequest, Subscription> subscriptions = new LinkedHashMap<>();
|
private final Map<SubscriptionRequest, Subscription> subscriptions = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
ReadWriteLock lifecycleMonitor = new ReentrantReadWriteLock();
|
||||||
|
ReadWriteLock subscriptionMonitor = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
private boolean running = false;
|
private boolean running = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -109,43 +114,34 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
|
|||||||
@Override
|
@Override
|
||||||
public void start() {
|
public void start() {
|
||||||
|
|
||||||
synchronized (lifecycleMonitor) {
|
doWhileLocked(lifecycleMonitor.writeLock(), () -> {
|
||||||
|
if (!this.running) {
|
||||||
|
subscriptions.values().stream() //
|
||||||
|
.filter(it -> !it.isActive()) //
|
||||||
|
.filter(TaskSubscription.class::isInstance) //
|
||||||
|
.map(TaskSubscription.class::cast) //
|
||||||
|
.map(TaskSubscription::getTask) //
|
||||||
|
.forEach(taskExecutor::execute);
|
||||||
|
|
||||||
if (this.running) {
|
running = true;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
});
|
||||||
subscriptions.values().stream() //
|
|
||||||
.filter(it -> !it.isActive()) //
|
|
||||||
.filter(TaskSubscription.class::isInstance) //
|
|
||||||
.map(TaskSubscription.class::cast) //
|
|
||||||
.map(TaskSubscription::getTask) //
|
|
||||||
.forEach(taskExecutor::execute);
|
|
||||||
|
|
||||||
running = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void stop() {
|
||||||
|
|
||||||
synchronized (lifecycleMonitor) {
|
doWhileLocked(lifecycleMonitor.writeLock(), () -> {
|
||||||
|
|
||||||
if (this.running) {
|
if (this.running) {
|
||||||
|
|
||||||
subscriptions.values().forEach(Cancelable::cancel);
|
subscriptions.values().forEach(Cancelable::cancel);
|
||||||
|
|
||||||
running = false;
|
running = false;
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isRunning() {
|
public boolean isRunning() {
|
||||||
|
return executeWhileLocked(lifecycleMonitor.readLock(), () -> running);
|
||||||
synchronized (this.lifecycleMonitor) {
|
|
||||||
return running;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -170,36 +166,32 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Subscription> lookup(SubscriptionRequest<?, ?, ?> request) {
|
public Optional<Subscription> lookup(SubscriptionRequest<?, ?, ?> request) {
|
||||||
|
return executeWhileLocked(subscriptionMonitor.readLock(), () -> Optional.ofNullable(subscriptions.get(request)));
|
||||||
synchronized (lifecycleMonitor) {
|
|
||||||
return Optional.ofNullable(subscriptions.get(request));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Subscription register(SubscriptionRequest request, Task task) {
|
public Subscription register(SubscriptionRequest request, Task task) {
|
||||||
|
|
||||||
Subscription subscription = new TaskSubscription(task);
|
return executeWhileLocked(this.subscriptionMonitor.writeLock(), () ->
|
||||||
|
{
|
||||||
synchronized (lifecycleMonitor) {
|
|
||||||
|
|
||||||
if (subscriptions.containsKey(request)) {
|
if (subscriptions.containsKey(request)) {
|
||||||
return subscriptions.get(request);
|
return subscriptions.get(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Subscription subscription = new TaskSubscription(task);
|
||||||
this.subscriptions.put(request, subscription);
|
this.subscriptions.put(request, subscription);
|
||||||
|
|
||||||
if (this.running) {
|
if (this.isRunning()) {
|
||||||
taskExecutor.execute(task);
|
taskExecutor.execute(task);
|
||||||
}
|
}
|
||||||
}
|
return subscription;
|
||||||
|
});
|
||||||
|
|
||||||
return subscription;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void remove(Subscription subscription) {
|
public void remove(Subscription subscription) {
|
||||||
|
|
||||||
synchronized (lifecycleMonitor) {
|
doWhileLocked(this.subscriptionMonitor.writeLock(), () -> {
|
||||||
|
|
||||||
if (subscriptions.containsValue(subscription)) {
|
if (subscriptions.containsValue(subscription)) {
|
||||||
|
|
||||||
@@ -209,6 +201,25 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
|
|||||||
|
|
||||||
subscriptions.values().remove(subscription);
|
subscriptions.values().remove(subscription);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void doWhileLocked(Lock lock, Runnable action) {
|
||||||
|
|
||||||
|
executeWhileLocked(lock, () -> {
|
||||||
|
action.run();
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private static <T> T executeWhileLocked(Lock lock, Supplier<T> stuff) {
|
||||||
|
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
return stuff.get();
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user