From db4a1be5cb90ac225510ec77c48a0c26a4801ab4 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Tue, 25 Jul 2023 10:30:00 +0200 Subject: [PATCH] Update after review --- .../core/convert/LazyLoadingProxyFactory.java | 43 ++++---- .../core/messaging/CursorReadingTask.java | 99 +++++++++---------- .../DefaultMessageListenerContainer.java | 71 ++++++------- 3 files changed, 106 insertions(+), 107 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java index bec7ab3b7..53ff2a0be 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java @@ -24,7 +24,6 @@ import java.io.Serializable; import java.lang.reflect.Method; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; @@ -138,7 +137,8 @@ public final class LazyLoadingProxyFactory { } return prepareProxyFactory(propertyType, - () -> new LazyLoadingInterceptor(property, callback, source, exceptionTranslator)).getProxy(LazyLoadingProxy.class.getClassLoader()); + () -> new LazyLoadingInterceptor(property, callback, source, exceptionTranslator)) + .getProxy(LazyLoadingProxy.class.getClassLoader()); } /** @@ -348,25 +348,26 @@ public final class LazyLoadingProxyFactory { private Object resolve() { lock.readLock().lock(); - if (resolved) { + try { + if (resolved) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(String.format("Accessing already resolved lazy loading property %s.%s", - property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace(String.format("Accessing already resolved lazy loading property %s.%s", + property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); + } + return result; } - return result; + } finally { + lock.readLock().unlock(); + } + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace(String.format("Resolving lazy loading property %s.%s", + property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); } - lock.readLock().unlock(); try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(String.format("Resolving lazy loading property %s.%s", - property.getOwner() != null ? property.getOwner().getName() : "unknown", property.getName())); - } - - lock.writeLock().lock(); - return callback.resolve(property); - + return executeWhileLocked(lock.writeLock(), () -> callback.resolve(property)); } catch (RuntimeException ex) { DataAccessException translatedException = exceptionTranslator.translateExceptionIfPossible(ex); @@ -377,8 +378,16 @@ public final class LazyLoadingProxyFactory { throw new LazyLoadingException("Unable to lazily resolve DBRef", translatedException != null ? translatedException : ex); + } + } + + private static T executeWhileLocked(Lock lock, Supplier stuff) { + + lock.lock(); + try { + return stuff.get(); } finally { - lock.writeLock().unlock(); + lock.unlock(); } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java index 1d6d81be6..dcebebbf7 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java @@ -88,19 +88,14 @@ abstract class CursorReadingTask implements Task { } } catch (InterruptedException e) { - lock.lock(); - state = State.CANCELLED; - lock.unlock(); + doWhileLocked(lock, () -> state = State.CANCELLED); Thread.currentThread().interrupt(); break; } } } catch (RuntimeException e) { - lock.lock(); - state = State.CANCELLED; - lock.unlock(); - + doWhileLocked(lock, () -> state = State.CANCELLED); errorHandler.handleError(e); } } @@ -116,33 +111,32 @@ abstract class CursorReadingTask implements Task { */ private void start() { - lock.lock(); - if (!State.RUNNING.equals(state)) { - state = State.STARTING; - } - lock.unlock(); + doWhileLocked(lock, () -> { + if (!State.RUNNING.equals(state)) { + state = State.STARTING; + } + }); do { - boolean valid = false; + // boolean valid = false; - lock.lock(); + boolean valid = executeWhileLocked(lock, () -> { - try { - if (State.STARTING.equals(state)) { - - MongoCursor cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType)); - valid = isValidCursor(cursor); - if (valid) { - this.cursor = cursor; - state = State.RUNNING; - } else if (cursor != null) { - cursor.close(); - } + if (!State.STARTING.equals(state)) { + return false; } - } finally { - lock.unlock(); - } + + MongoCursor cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType)); + boolean isValid = isValidCursor(cursor); + if (isValid) { + this.cursor = cursor; + state = State.RUNNING; + } else if (cursor != null) { + cursor.close(); + } + return isValid; + }); if (!valid) { @@ -150,9 +144,7 @@ abstract class CursorReadingTask implements Task { Thread.sleep(100); } catch (InterruptedException e) { - lock.lock(); - state = State.CANCELLED; - lock.unlock(); + doWhileLocked(lock, () -> state = State.CANCELLED); Thread.currentThread().interrupt(); } } @@ -168,9 +160,7 @@ abstract class CursorReadingTask implements Task { @Override public void cancel() throws DataAccessResourceFailureException { - lock.lock(); - - try { + doWhileLocked(lock, () -> { if (State.RUNNING.equals(state) || State.STARTING.equals(state)) { this.state = State.CANCELLED; @@ -178,9 +168,7 @@ abstract class CursorReadingTask implements Task { cursor.close(); } } - } finally { - lock.unlock(); - } + }); } @Override @@ -190,13 +178,7 @@ abstract class CursorReadingTask implements Task { @Override public State getState() { - - lock.lock(); - try { - return state; - } finally { - lock.unlock(); - } + return executeWhileLocked(lock, () -> state); } @Override @@ -232,16 +214,12 @@ abstract class CursorReadingTask implements Task { @Nullable private T getNext() { - lock.lock(); - try{ + return executeWhileLocked(lock, () -> { if (State.RUNNING.equals(state)) { return cursor.tryNext(); } - } finally { - lock.unlock(); - } - - throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor)); + throw new IllegalStateException(String.format("Cursor %s is not longer open", cursor)); + }); } private static boolean isValidCursor(@Nullable MongoCursor cursor) { @@ -278,4 +256,23 @@ abstract class CursorReadingTask implements Task { throw translated != null ? translated : e; } } + + private static void doWhileLocked(Lock lock, Runnable action) { + + executeWhileLocked(lock, () -> { + action.run(); + return null; + }); + } + + @Nullable + private static T executeWhileLocked(Lock lock, Supplier stuff) { + + lock.lock(); + try { + return stuff.get(); + } finally { + lock.unlock(); + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java index a38a02b18..0e8f72cfe 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java @@ -20,8 +20,10 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.Executor; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,8 +39,7 @@ import org.springframework.util.ObjectUtils; /** * Simple {@link Executor} based {@link MessageListenerContainer} implementation for running {@link Task tasks} like * listening to MongoDB Change Streams and tailable - * cursors. - *
+ * cursors.
* This message container creates long-running tasks that are executed on {@link Executor}. * * @author Christoph Strobl @@ -113,8 +114,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @Override public void start() { - lifecycleMonitor.writeLock().lock(); - try { + doWhileLocked(lifecycleMonitor.writeLock(), () -> { if (!this.running) { subscriptions.values().stream() // .filter(it -> !it.isActive()) // @@ -125,36 +125,23 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer running = true; } - } finally { - lifecycleMonitor.writeLock().unlock(); - } + }); } @Override public void stop() { - lifecycleMonitor.writeLock().lock(); - - try { + doWhileLocked(lifecycleMonitor.writeLock(), () -> { if (this.running) { subscriptions.values().forEach(Cancelable::cancel); running = false; } - } finally { - lifecycleMonitor.writeLock().unlock(); - } - + }); } @Override public boolean isRunning() { - - lifecycleMonitor.writeLock().lock(); - try { - return running; - } finally { - lifecycleMonitor.writeLock().unlock(); - } + return executeWhileLocked(lifecycleMonitor.readLock(), () -> running); } @Override @@ -179,43 +166,32 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @Override public Optional lookup(SubscriptionRequest request) { - - subscriptionMonitor.readLock(); - try { - return Optional.ofNullable(subscriptions.get(request)); - } finally { - subscriptionMonitor.readLock().unlock(); - } - + return executeWhileLocked(subscriptionMonitor.readLock(), () -> Optional.ofNullable(subscriptions.get(request))); } public Subscription register(SubscriptionRequest request, Task task) { - Subscription subscription = new TaskSubscription(task); - - this.subscriptionMonitor.writeLock().lock(); - try { + return executeWhileLocked(this.subscriptionMonitor.writeLock(), () -> + { if (subscriptions.containsKey(request)) { return subscriptions.get(request); } + Subscription subscription = new TaskSubscription(task); this.subscriptions.put(request, subscription); if (this.isRunning()) { taskExecutor.execute(task); } return subscription; - } finally { - this.subscriptionMonitor.writeLock().unlock(); - } + }); } @Override public void remove(Subscription subscription) { - this.subscriptionMonitor.writeLock().lock(); - try { + doWhileLocked(this.subscriptionMonitor.writeLock(), () -> { if (subscriptions.containsValue(subscription)) { @@ -225,8 +201,25 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer subscriptions.values().remove(subscription); } + }); + } + + private static void doWhileLocked(Lock lock, Runnable action) { + + executeWhileLocked(lock, () -> { + action.run(); + return null; + }); + } + + @Nullable + private static T executeWhileLocked(Lock lock, Supplier stuff) { + + lock.lock(); + try { + return stuff.get(); } finally { - this.subscriptionMonitor.writeLock().unlock(); + lock.unlock(); } }