DATAMONGO-2322 - Handle exceptions thrown by MessageListeners.

ErrorHandlers associated with a CursorReadingTask (Change Streams, imperative Tailable Cursors) now handle exceptions raised by the listener callback.

Exceptions are now catched and the callback continues with the next message.
This commit is contained in:
Mark Paluch
2019-07-18 09:56:48 +02:00
parent 0ad8857368
commit aff8b89006
2 changed files with 63 additions and 17 deletions

View File

@@ -75,24 +75,34 @@ abstract class CursorReadingTask<T, R> implements Task {
start();
while (isRunning()) {
try {
while (isRunning()) {
try {
try {
T next = execute(this::getNext);
T next = execute(this::getNext);
if (next != null) {
emitMessage(createMessage(next, targetType, request.getRequestOptions()));
} else {
Thread.sleep(10);
if (next != null) {
emitMessage(createMessage(next, targetType, request.getRequestOptions()));
} else {
Thread.sleep(10);
}
} catch (InterruptedException e) {
synchronized (lifecycleMonitor) {
state = State.CANCELLED;
}
Thread.currentThread().interrupt();
break;
}
} catch (InterruptedException e) {
synchronized (lifecycleMonitor) {
state = State.CANCELLED;
}
Thread.currentThread().interrupt();
}
} catch (RuntimeException e) {
synchronized (lifecycleMonitor) {
state = State.CANCELLED;
}
errorHandler.handleError(e);
}
}
@@ -126,7 +136,7 @@ abstract class CursorReadingTask<T, R> implements Task {
if (valid) {
this.cursor = cursor;
state = State.RUNNING;
} else if(cursor != null){
} else if (cursor != null) {
cursor.close();
}
}
@@ -219,7 +229,11 @@ abstract class CursorReadingTask<T, R> implements Task {
@SuppressWarnings("unchecked")
private void emitMessage(Message<T, R> message) {
request.getMessageListener().onMessage((Message) message);
try {
request.getMessageListener().onMessage((Message) message);
} catch (Exception e) {
errorHandler.handleError(e);
}
}
@Nullable

View File

@@ -16,13 +16,14 @@
package org.springframework.data.mongodb.core.messaging;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import static org.springframework.data.mongodb.core.messaging.SubscriptionUtils.*;
import lombok.Data;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.bson.Document;
@@ -30,6 +31,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.dao.DataAccessException;
import org.springframework.data.annotation.Id;
@@ -99,12 +101,42 @@ public class DefaultMessageListenerContainerTests {
assertThat(messageListener.getMessages().stream().map(Message::getBody).collect(Collectors.toList()))
.containsExactly(new Person("id-1", "foo"), new Person("id-2", "bar"));
}
@Test // DATAMONGO-2322
@IfProfileValue(name = "replSet", value = "true")
public void shouldNotifyErrorHandlerOnErrorInListener() throws InterruptedException {
ErrorHandler errorHandler = mock(ErrorHandler.class);
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
AtomicBoolean thrownException = new AtomicBoolean();
Subscription subscription = container.register(new ChangeStreamRequest(message -> {
try {
if (thrownException.compareAndSet(false, true)) {
throw new IllegalStateException("Boom!");
}
} finally {
messageListener.onMessage(message);
}
}, () -> COLLECTION_NAME), Person.class, errorHandler);
container.start();
awaitSubscription(subscription, Duration.ofMillis(500));
collection.insertOne(new Document("_id", "id-1").append("firstname", "foo"));
collection.insertOne(new Document("_id", "id-2").append("firstname", "bar"));
awaitMessages(messageListener, 2, Duration.ofMillis(500));
verify(errorHandler, atLeast(1)).handleError(any(IllegalStateException.class));
assertThat(messageListener.getTotalNumberMessagesReceived()).isEqualTo(2);
}
@Test // DATAMONGO-1803
@IfProfileValue(name = "replSet", value = "true")
public void shouldNoLongerReceiveMessagesWhenConainerStopped() throws InterruptedException {
public void shouldNoLongerReceiveMessagesWhenContainerStopped() throws InterruptedException {
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
Subscription subscription = container.register(new ChangeStreamRequest(messageListener, () -> COLLECTION_NAME),