Fix DLQ and raw/embedded headers

Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/351

- DLQ should support embedded headers for backwards compatibility with 1.x apps
- DLQ should support `HeaderMode.none` for when using older brokers with raw data

Forward port of https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/350

Resolves #352
This commit is contained in:
Gary Russell
2018-04-02 13:19:29 -04:00
committed by Oleg Zhurakousky
parent 0689e87489
commit 39dd048ee5
2 changed files with 74 additions and 19 deletions

View File

@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -52,10 +53,12 @@ import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.DefaultPollableMessageSource;
import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties.StandardHeaders;
@@ -75,6 +78,7 @@ import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessage
import org.springframework.integration.support.AcknowledgmentCallback;
import org.springframework.integration.support.AcknowledgmentCallback.Status;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.StaticMessageHeaderAccessor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@@ -601,7 +605,8 @@ public class KafkaMessageChannelBinder extends
DlqSender<?,?> dlqSender = new DlqSender(kafkaTemplate, dlqName);
return message -> {
final ConsumerRecord<?, ?> record = message.getHeaders()
@SuppressWarnings("unchecked")
final ConsumerRecord<Object, Object> record = message.getHeaders()
.get(KafkaHeaders.RAW_DATA, ConsumerRecord.class);
if (properties.isUseNativeDecoding()) {
@@ -625,16 +630,40 @@ public class KafkaMessageChannelBinder extends
return;
}
Headers kafkaHeaders = new RecordHeaders(record.headers().toArray());
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TOPIC,
record.topic().getBytes(StandardCharsets.UTF_8)));
AtomicReference<ConsumerRecord<?, ?>> recordToSend = new AtomicReference<>(record);
if (message.getPayload() instanceof Throwable) {
Throwable throwable = (Throwable) message.getPayload();
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_MESSAGE,
throwable.getMessage().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_STACKTRACE,
getStackTraceAsString(throwable).getBytes(StandardCharsets.UTF_8)));
HeaderMode headerMode = properties.getHeaderMode();
if (headerMode == null || HeaderMode.headers.equals(headerMode)) {
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TOPIC,
record.topic().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_MESSAGE,
throwable.getMessage().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_STACKTRACE,
getStackTraceAsString(throwable).getBytes(StandardCharsets.UTF_8)));
}
else if (HeaderMode.embeddedHeaders.equals(headerMode)) {
try {
MessageValues messageValues = EmbeddedHeaderUtils
.extractHeaders(MessageBuilder.withPayload((byte[]) record.value()).build(),
false);
messageValues.put(X_ORIGINAL_TOPIC, record.topic());
messageValues.put(X_EXCEPTION_MESSAGE, throwable.getMessage());
messageValues.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(throwable));
final String[] headersToEmbed = new ArrayList<>(messageValues.keySet()).toArray(
new String[messageValues.keySet().size()]);
byte[] payload = EmbeddedHeaderUtils.embedHeaders(messageValues,
EmbeddedHeaderUtils.headersToEmbed(headersToEmbed));
recordToSend.set(new ConsumerRecord<Object, Object>(record.topic(), record.partition(),
record.offset(), record.key(), payload));
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
dlqSender.sendToDlq(record, kafkaHeaders);
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders);
};
}
return null;

View File

@@ -573,20 +573,30 @@ public class KafkaBinderTests extends
@Test
public void testDlqAndRetry() throws Exception {
testDlqGuts(true);
testDlqGuts(true, null);
}
@Test
public void testDlq() throws Exception {
testDlqGuts(false);
testDlqGuts(false, null);
}
@SuppressWarnings("unchecked")
private void testDlqGuts(boolean withRetry) throws Exception {
@Test
public void testDlqNone() throws Exception {
testDlqGuts(false, HeaderMode.none);
}
@Test
public void testDlqEmbedded() throws Exception {
testDlqGuts(false, HeaderMode.embeddedHeaders);
}
private void testDlqGuts(boolean withRetry, HeaderMode headerMode) throws Exception {
AbstractKafkaTestBinder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.getExtension().setHeaderPatterns(new String[]{MessageHeaders.CONTENT_TYPE});
producerProperties.setHeaderMode(headerMode);
DirectChannel moduleOutputChannel = createBindableChannel("output",
createProducerBindingProperties(producerProperties));
@@ -597,6 +607,7 @@ public class KafkaBinderTests extends
consumerProperties.setBackOffMaxInterval(150);
consumerProperties.getExtension().setEnableDlq(true);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
consumerProperties.setHeaderMode(headerMode);
DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));
@@ -614,6 +625,7 @@ public class KafkaBinderTests extends
ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
dlqConsumerProperties.setMaxAttempts(1);
dlqConsumerProperties.setHeaderMode(headerMode);
ApplicationContext context = TestUtils.getPropertyValue(binder.getBinder(), "applicationContext",
ApplicationContext.class);
@@ -640,13 +652,27 @@ public class KafkaBinderTests extends
Message<?> receivedMessage = receive(dlqChannel, 3);
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
.isEqualTo(producerName.getBytes(StandardCharsets.UTF_8));
assertThat(new String((byte[]) receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE)))
.startsWith("failed to send Message to channel 'input'");
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE))
.isNotNull();
if (HeaderMode.embeddedHeaders.equals(headerMode)) {
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
.isEqualTo(producerName);
assertThat(((String) receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE)))
.startsWith("failed to send Message to channel 'input'");
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE))
.isNotNull();
}
else if (!HeaderMode.none.equals(headerMode)) {
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
.isEqualTo(producerName.getBytes(StandardCharsets.UTF_8));
assertThat(new String((byte[]) receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE)))
.startsWith("failed to send Message to channel 'input'");
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE))
.isNotNull();
}
else {
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC)).isNull();
}
binderBindUnbindLatency();
// verify we got a message on the dedicated error channel and the global (via bridge)