diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java index fd5d72761..c87f0ffa9 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java @@ -73,9 +73,10 @@ abstract class CursorReadingTask implements Task { @Override public void run() { - start(); - try { + + start(); + while (isRunning()) { try { @@ -263,8 +264,8 @@ abstract class CursorReadingTask implements Task { /** * Execute an operation and take care of translating exceptions using the {@link MongoTemplate templates} - * {@link org.springframework.data.mongodb.core.MongoExceptionTranslator} and passing those on to the - * {@link #errorHandler}. + * {@link org.springframework.data.mongodb.core.MongoExceptionTranslator} rethrowing the potentially translated + * exception. * * @param callback must not be {@literal null}. * @param @@ -279,10 +280,7 @@ abstract class CursorReadingTask implements Task { } catch (RuntimeException e) { RuntimeException translated = template.getExceptionTranslator().translateExceptionIfPossible(e); - RuntimeException toHandle = translated != null ? translated : e; - - errorHandler.handleError(toHandle); - throw toHandle; + throw translated != null ? translated : e; } } } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/CursorReadingTaskUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/CursorReadingTaskUnitTests.java index ce2956593..91fcb4c87 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/CursorReadingTaskUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/CursorReadingTaskUnitTests.java @@ -96,17 +96,29 @@ public class CursorReadingTaskUnitTests { verify(listener, times(task.getValues().size())).onMessage(any()); } - @Test // DATAMONGO-2173 + @Test // DATAMONGO-2173, DATAMONGO-2366 public void writesErrorOnStartToErrorHandler() { ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(Throwable.class); Task task = new ErrorOnInitCursorTaskStub(template, request, Object.class, errorHandler); - assertThatExceptionOfType(RuntimeException.class).isThrownBy(task::run); + task.run(); verify(errorHandler).handleError(errorCaptor.capture()); assertThat(errorCaptor.getValue()).hasMessageStartingWith("let's get it started (ha)"); } + @Test // DATAMONGO-2366 + public void errorOnNextNotifiesErrorHandlerOnlyOnce() { + + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(Throwable.class); + when(cursor.getServerCursor()).thenReturn(new ServerCursor(10, new ServerAddress("mock"))); + when(cursor.tryNext()).thenThrow(new IllegalStateException()); + + task.run(); + verify(errorHandler).handleError(errorCaptor.capture()); + assertThat(errorCaptor.getValue()).isInstanceOf(IllegalStateException.class); + } + private static class MultithreadedStopRunningWhileEmittingMessages extends MultithreadedTestCase { CursorReadingTask task;