Update after review
This commit is contained in:
@@ -24,7 +24,6 @@ import java.io.Serializable;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@@ -138,7 +137,8 @@ public final class LazyLoadingProxyFactory {
|
||||
}
|
||||
|
||||
return prepareProxyFactory(propertyType,
|
||||
() -> new LazyLoadingInterceptor(property, callback, source, exceptionTranslator)).getProxy(LazyLoadingProxy.class.getClassLoader());
|
||||
() -> new LazyLoadingInterceptor(property, callback, source, exceptionTranslator))
|
||||
.getProxy(LazyLoadingProxy.class.getClassLoader());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -348,25 +348,26 @@ public final class LazyLoadingProxyFactory {
|
||||
private Object resolve() {
|
||||
|
||||
lock.readLock().lock();
|
||||
if (resolved) {
|
||||
try {
|
||||
if (resolved) {
|
||||
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace(String.format("Accessing already resolved lazy loading property %s.%s",
|
||||
property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName()));
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace(String.format("Accessing already resolved lazy loading property %s.%s",
|
||||
property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName()));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return result;
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace(String.format("Resolving lazy loading property %s.%s",
|
||||
property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName()));
|
||||
}
|
||||
lock.readLock().unlock();
|
||||
|
||||
try {
|
||||
if (LOGGER.isTraceEnabled()) {
|
||||
LOGGER.trace(String.format("Resolving lazy loading property %s.%s",
|
||||
property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName()));
|
||||
}
|
||||
|
||||
lock.writeLock().lock();
|
||||
return callback.resolve(property);
|
||||
|
||||
return executeWhileLocked(lock.writeLock(), () -> callback.resolve(property));
|
||||
} catch (RuntimeException ex) {
|
||||
|
||||
DataAccessException translatedException = exceptionTranslator.translateExceptionIfPossible(ex);
|
||||
@@ -377,8 +378,16 @@ public final class LazyLoadingProxyFactory {
|
||||
|
||||
throw new LazyLoadingException("Unable to lazily resolve DBRef",
|
||||
translatedException != null ? translatedException : ex);
|
||||
}
|
||||
}
|
||||
|
||||
private static <T> T executeWhileLocked(Lock lock, Supplier<T> stuff) {
|
||||
|
||||
lock.lock();
|
||||
try {
|
||||
return stuff.get();
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,19 +88,14 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
lock.lock();
|
||||
state = State.CANCELLED;
|
||||
lock.unlock();
|
||||
doWhileLocked(lock, () -> state = State.CANCELLED);
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
|
||||
lock.lock();
|
||||
state = State.CANCELLED;
|
||||
lock.unlock();
|
||||
|
||||
doWhileLocked(lock, () -> state = State.CANCELLED);
|
||||
errorHandler.handleError(e);
|
||||
}
|
||||
}
|
||||
@@ -116,33 +111,32 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
*/
|
||||
private void start() {
|
||||
|
||||
lock.lock();
|
||||
if (!State.RUNNING.equals(state)) {
|
||||
state = State.STARTING;
|
||||
}
|
||||
lock.unlock();
|
||||
doWhileLocked(lock, () -> {
|
||||
if (!State.RUNNING.equals(state)) {
|
||||
state = State.STARTING;
|
||||
}
|
||||
});
|
||||
|
||||
do {
|
||||
|
||||
boolean valid = false;
|
||||
// boolean valid = false;
|
||||
|
||||
lock.lock();
|
||||
boolean valid = executeWhileLocked(lock, () -> {
|
||||
|
||||
try {
|
||||
if (State.STARTING.equals(state)) {
|
||||
|
||||
MongoCursor<T> cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType));
|
||||
valid = isValidCursor(cursor);
|
||||
if (valid) {
|
||||
this.cursor = cursor;
|
||||
state = State.RUNNING;
|
||||
} else if (cursor != null) {
|
||||
cursor.close();
|
||||
}
|
||||
if (!State.STARTING.equals(state)) {
|
||||
return false;
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
MongoCursor<T> cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType));
|
||||
boolean isValid = isValidCursor(cursor);
|
||||
if (isValid) {
|
||||
this.cursor = cursor;
|
||||
state = State.RUNNING;
|
||||
} else if (cursor != null) {
|
||||
cursor.close();
|
||||
}
|
||||
return isValid;
|
||||
});
|
||||
|
||||
if (!valid) {
|
||||
|
||||
@@ -150,9 +144,7 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
lock.lock();
|
||||
state = State.CANCELLED;
|
||||
lock.unlock();
|
||||
doWhileLocked(lock, () -> state = State.CANCELLED);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
@@ -168,9 +160,7 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
@Override
|
||||
public void cancel() throws DataAccessResourceFailureException {
|
||||
|
||||
lock.lock();
|
||||
|
||||
try {
|
||||
doWhileLocked(lock, () -> {
|
||||
|
||||
if (State.RUNNING.equals(state) || State.STARTING.equals(state)) {
|
||||
this.state = State.CANCELLED;
|
||||
@@ -178,9 +168,7 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
cursor.close();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -190,13 +178,7 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
|
||||
@Override
|
||||
public State getState() {
|
||||
|
||||
lock.lock();
|
||||
try {
|
||||
return state;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return executeWhileLocked(lock, () -> state);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -232,16 +214,12 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
@Nullable
|
||||
private T getNext() {
|
||||
|
||||
lock.lock();
|
||||
try{
|
||||
return executeWhileLocked(lock, () -> {
|
||||
if (State.RUNNING.equals(state)) {
|
||||
return cursor.tryNext();
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor));
|
||||
throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor));
|
||||
});
|
||||
}
|
||||
|
||||
private static boolean isValidCursor(@Nullable MongoCursor<?> cursor) {
|
||||
@@ -278,4 +256,23 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
throw translated != null ? translated : e;
|
||||
}
|
||||
}
|
||||
|
||||
private static void doWhileLocked(Lock lock, Runnable action) {
|
||||
|
||||
executeWhileLocked(lock, () -> {
|
||||
action.run();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static <T> T executeWhileLocked(Lock lock, Supplier<T> stuff) {
|
||||
|
||||
lock.lock();
|
||||
try {
|
||||
return stuff.get();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,8 +20,10 @@ import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@@ -37,8 +39,7 @@ import org.springframework.util.ObjectUtils;
|
||||
/**
|
||||
* Simple {@link Executor} based {@link MessageListenerContainer} implementation for running {@link Task tasks} like
|
||||
* listening to MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change Streams</a> and tailable
|
||||
* cursors.
|
||||
* <br />
|
||||
* cursors. <br />
|
||||
* This message container creates long-running tasks that are executed on {@link Executor}.
|
||||
*
|
||||
* @author Christoph Strobl
|
||||
@@ -113,8 +114,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
lifecycleMonitor.writeLock().lock();
|
||||
try {
|
||||
doWhileLocked(lifecycleMonitor.writeLock(), () -> {
|
||||
if (!this.running) {
|
||||
subscriptions.values().stream() //
|
||||
.filter(it -> !it.isActive()) //
|
||||
@@ -125,36 +125,23 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
|
||||
|
||||
running = true;
|
||||
}
|
||||
} finally {
|
||||
lifecycleMonitor.writeLock().unlock();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
|
||||
lifecycleMonitor.writeLock().lock();
|
||||
|
||||
try {
|
||||
doWhileLocked(lifecycleMonitor.writeLock(), () -> {
|
||||
if (this.running) {
|
||||
subscriptions.values().forEach(Cancelable::cancel);
|
||||
running = false;
|
||||
}
|
||||
} finally {
|
||||
lifecycleMonitor.writeLock().unlock();
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
|
||||
lifecycleMonitor.writeLock().lock();
|
||||
try {
|
||||
return running;
|
||||
} finally {
|
||||
lifecycleMonitor.writeLock().unlock();
|
||||
}
|
||||
return executeWhileLocked(lifecycleMonitor.readLock(), () -> running);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -179,43 +166,32 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
|
||||
|
||||
@Override
|
||||
public Optional<Subscription> lookup(SubscriptionRequest<?, ?, ?> request) {
|
||||
|
||||
subscriptionMonitor.readLock();
|
||||
try {
|
||||
return Optional.ofNullable(subscriptions.get(request));
|
||||
} finally {
|
||||
subscriptionMonitor.readLock().unlock();
|
||||
}
|
||||
|
||||
return executeWhileLocked(subscriptionMonitor.readLock(), () -> Optional.ofNullable(subscriptions.get(request)));
|
||||
}
|
||||
|
||||
public Subscription register(SubscriptionRequest request, Task task) {
|
||||
|
||||
Subscription subscription = new TaskSubscription(task);
|
||||
|
||||
this.subscriptionMonitor.writeLock().lock();
|
||||
try {
|
||||
return executeWhileLocked(this.subscriptionMonitor.writeLock(), () ->
|
||||
{
|
||||
if (subscriptions.containsKey(request)) {
|
||||
return subscriptions.get(request);
|
||||
}
|
||||
|
||||
Subscription subscription = new TaskSubscription(task);
|
||||
this.subscriptions.put(request, subscription);
|
||||
|
||||
if (this.isRunning()) {
|
||||
taskExecutor.execute(task);
|
||||
}
|
||||
return subscription;
|
||||
} finally {
|
||||
this.subscriptionMonitor.writeLock().unlock();
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(Subscription subscription) {
|
||||
|
||||
this.subscriptionMonitor.writeLock().lock();
|
||||
try {
|
||||
doWhileLocked(this.subscriptionMonitor.writeLock(), () -> {
|
||||
|
||||
if (subscriptions.containsValue(subscription)) {
|
||||
|
||||
@@ -225,8 +201,25 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
|
||||
|
||||
subscriptions.values().remove(subscription);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static void doWhileLocked(Lock lock, Runnable action) {
|
||||
|
||||
executeWhileLocked(lock, () -> {
|
||||
action.run();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static <T> T executeWhileLocked(Lock lock, Supplier<T> stuff) {
|
||||
|
||||
lock.lock();
|
||||
try {
|
||||
return stuff.get();
|
||||
} finally {
|
||||
this.subscriptionMonitor.writeLock().unlock();
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user