DATAMONGO-2366 - Consistently handle exceptions in CursorReadingTask.
Exceptions during CursorReadingTask startup and during polling are handled now by the same exception handling to handle Exceptions only once and notify ErrorHandler exactly once per exception. Previously, startup exceptions relied on exception handling in the execute closure and notified ErrorHandler potentially multiple times. Original Pull Request: #790
This commit is contained in:
committed by
Christoph Strobl
parent
2166a6e953
commit
b7b339577b
@@ -73,9 +73,10 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
start();
|
||||
|
||||
try {
|
||||
|
||||
start();
|
||||
|
||||
while (isRunning()) {
|
||||
|
||||
try {
|
||||
@@ -263,8 +264,8 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
|
||||
/**
|
||||
* Execute an operation and take care of translating exceptions using the {@link MongoTemplate templates}
|
||||
* {@link org.springframework.data.mongodb.core.MongoExceptionTranslator} and passing those on to the
|
||||
* {@link #errorHandler}.
|
||||
* {@link org.springframework.data.mongodb.core.MongoExceptionTranslator} rethrowing the potentially translated
|
||||
* exception.
|
||||
*
|
||||
* @param callback must not be {@literal null}.
|
||||
* @param <T>
|
||||
@@ -279,10 +280,7 @@ abstract class CursorReadingTask<T, R> implements Task {
|
||||
} catch (RuntimeException e) {
|
||||
|
||||
RuntimeException translated = template.getExceptionTranslator().translateExceptionIfPossible(e);
|
||||
RuntimeException toHandle = translated != null ? translated : e;
|
||||
|
||||
errorHandler.handleError(toHandle);
|
||||
throw toHandle;
|
||||
throw translated != null ? translated : e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,17 +96,29 @@ public class CursorReadingTaskUnitTests {
|
||||
verify(listener, times(task.getValues().size())).onMessage(any());
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2173
|
||||
@Test // DATAMONGO-2173, DATAMONGO-2366
|
||||
public void writesErrorOnStartToErrorHandler() {
|
||||
|
||||
ArgumentCaptor<Throwable> errorCaptor = ArgumentCaptor.forClass(Throwable.class);
|
||||
Task task = new ErrorOnInitCursorTaskStub(template, request, Object.class, errorHandler);
|
||||
|
||||
assertThatExceptionOfType(RuntimeException.class).isThrownBy(task::run);
|
||||
task.run();
|
||||
verify(errorHandler).handleError(errorCaptor.capture());
|
||||
assertThat(errorCaptor.getValue()).hasMessageStartingWith("let's get it started (ha)");
|
||||
}
|
||||
|
||||
@Test // DATAMONGO-2366
|
||||
public void errorOnNextNotifiesErrorHandlerOnlyOnce() {
|
||||
|
||||
ArgumentCaptor<Throwable> errorCaptor = ArgumentCaptor.forClass(Throwable.class);
|
||||
when(cursor.getServerCursor()).thenReturn(new ServerCursor(10, new ServerAddress("mock")));
|
||||
when(cursor.tryNext()).thenThrow(new IllegalStateException());
|
||||
|
||||
task.run();
|
||||
verify(errorHandler).handleError(errorCaptor.capture());
|
||||
assertThat(errorCaptor.getValue()).isInstanceOf(IllegalStateException.class);
|
||||
}
|
||||
|
||||
private static class MultithreadedStopRunningWhileEmittingMessages extends MultithreadedTestCase {
|
||||
|
||||
CursorReadingTask task;
|
||||
|
||||
Reference in New Issue
Block a user