Compare commits

...

13 Commits

Author SHA1 Message Date
Soby Chacko
6d5c847c47 1.3.3.RELEASE 2018-06-12 13:22:16 -04:00
Gary Russell
6944ca7212 Fix default partition header expression 2018-06-11 17:13:46 -04:00
Soby Chacko
52d879b2cc KafkaBinderConfigurationProperties duplicate bean
ConfigurationProperties bean provided by Kafka Streams binder
extends from `KafkaBinderConfigurationProperties` used by Kafka binder.
It creates a conflict when autowiring this bean from Kafka binder configuration.
This prevents an application to have both binders in the classpath.
Change the creation of this ConfigurationProperties bean so that it
avoids creating bean using EnbaleConfigurationProperties and then autowiring,
but directly create the Bean using `@Bean`. This prevents the conflict.

Resolves #244
Resolves #315
2018-06-11 17:12:07 -04:00
Gary Russell
e982d60cd3 Add test case for previous commit 2018-04-02 12:20:48 -04:00
Aldo Sinanaj
fe571133fb Fix DLQ and Embedded Headers for Raw Mode
Enhance the error message headers before sending to DLQ only for consumer with embedded header mode
2018-04-02 11:56:20 -04:00
Soby Chacko
4cc183396d Update ScSt version to 1.3.3.BUILD-SNAPSHOT 2018-01-10 15:39:32 -05:00
Soby Chacko
ab87c79f7f 1.3.3.BUILD-SNAPSHOT 2018-01-10 14:47:36 -05:00
Soby Chacko
079bd6f7c4 1.3.2.RELEASE 2018-01-10 14:23:25 -05:00
Soby Chacko
971b78936b Next update version: 1.3.2.BUILD-SNAPSHOT 2017-12-27 12:32:36 -05:00
Soby Chacko
eec0eb7892 Upgrade to 1.3.1.RELEASE 2017-12-27 11:36:47 -05:00
Soby Chacko
d639731c66 Remove wildcard imports 2017-12-26 17:22:27 -05:00
Soby Chacko
49b780717e Backport DLQ releated test changes
backport missing dlq related test configuration changes
from 0.10.2 test binder to 0.10.1 test binder
2017-12-26 17:22:27 -05:00
Aldo Sinanaj
f4fc9442c7 Enhance DLQ messages with failure info
Polishing
2017-11-20 11:16:05 -05:00
13 changed files with 138 additions and 47 deletions

View File

@@ -2,12 +2,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.3.RELEASE</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.3.5.RELEASE</version>
<version>1.3.9.RELEASE</version>
<relativePath />
</parent>
<properties>
@@ -15,8 +15,8 @@
<kafka.version>0.10.1.1</kafka.version>
<spring-kafka.version>1.1.6.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.1.2.RELEASE</spring-integration-kafka.version>
<spring-cloud-stream.version>1.3.1.BUILD-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-build.version>1.3.5.RELEASE</spring-cloud-build.version>
<spring-cloud-stream.version>1.3.3.RELEASE</spring-cloud-stream.version>
<spring-cloud-build.version>1.3.9.RELEASE</spring-cloud-build.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.3.RELEASE</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.3.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.1-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.1 Tests</description>

View File

@@ -16,13 +16,6 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import kafka.utils.ZKStringSerializer$;
@@ -36,7 +29,6 @@ import org.eclipse.jetty.server.Server;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -58,11 +50,18 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import static org.junit.Assert.assertTrue;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
*
* This test specifically tests for the 0.10.1.x version of Kafka.
*
* @author Eric Bottard
@@ -79,7 +78,7 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
private Kafka10TestBinder binder;
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
private final Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
@@ -90,11 +89,13 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
protected Kafka10TestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setHeaders("dlqTestHeader");
binder = new Kafka10TestBinder(binderConfiguration);
}
return binder;
}
@Override
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.3.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.2-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.2 Tests</description>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 the original author or authors.
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,8 +25,10 @@ import java.util.UUID;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -69,6 +71,7 @@ import static org.junit.Assert.assertTrue;
* @author Marius Bogoevici
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
* @author Gary Russell
*/
public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
@@ -79,7 +82,7 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
private Kafka10TestBinder binder;
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
private final Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
@@ -90,11 +93,13 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
protected Kafka10TestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setHeaders("dlqTestHeader");
binder = new Kafka10TestBinder(binderConfiguration);
}
return binder;
}
@Override
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.3.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.3.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.3.RELEASE</version>
</parent>
<dependencies>

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.binder.kafka;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -39,9 +41,12 @@ import org.apache.kafka.common.utils.Utils;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
@@ -71,6 +76,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
@@ -90,12 +96,19 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
* @author Soby Chacko
* @author Henryk Konsek
* @author Doug Saus
* @author Aldo Sinanaj
*/
public class KafkaMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
public static final String X_ORIGINAL_TOPIC = "x-original-topic";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
private final KafkaBinderConfigurationProperties configurationProperties;
private final Map<String, TopicInformation> topicsInUse = new HashMap<>();
@@ -343,10 +356,36 @@ public class KafkaMessageChannelBinder extends
public void handleMessage(Message<?> message) throws MessagingException {
final ConsumerRecord<?, ?> record = message.getHeaders()
.get(KafkaMessageDrivenChannelAdapter.KAFKA_RAW_DATA, ConsumerRecord.class);
final byte[] key = record.key() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
final byte[] key = record.key() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
: null;
final byte[] payload = record.value() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
final byte[] payload;
if (HeaderMode.embeddedHeaders == extendedConsumerProperties.getHeaderMode()
&& message.getPayload() instanceof Throwable) {
final Throwable throwable = (Throwable) message.getPayload();
final String failureMessage = throwable.getMessage();
try {
MessageValues messageValues = EmbeddedHeaderUtils
.extractHeaders(MessageBuilder.withPayload((byte[]) record.value()).build(),
false);
messageValues.put(X_ORIGINAL_TOPIC, record.topic());
messageValues.put(X_EXCEPTION_MESSAGE, failureMessage);
messageValues.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(throwable));
final String[] headersToEmbed = new ArrayList<>(messageValues.keySet()).toArray(
new String[messageValues.keySet().size()]);
payload = EmbeddedHeaderUtils.embedHeaders(messageValues,
EmbeddedHeaderUtils.headersToEmbed(headersToEmbed));
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
else {
payload = record.value() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
}
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
? extendedConsumerProperties.getExtension().getDlqName()
: "error." + destination.getName() + "." + group;
@@ -434,6 +473,13 @@ public class KafkaMessageChannelBinder extends
return original.substring(0, maxCharacters) + "...";
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
private final class ProducerConfigurationMessageHandler extends KafkaProducerMessageHandler<byte[], byte[]>
implements Lifecycle {
@@ -450,7 +496,7 @@ public class KafkaMessageChannelBinder extends
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
if (producerProperties.isPartitioned()) {
SpelExpressionParser parser = new SpelExpressionParser();
setPartitionIdExpression(parser.parseExpression("headers." + BinderHeaders.PARTITION_HEADER));
setPartitionIdExpression(parser.parseExpression("headers['" + BinderHeaders.PARTITION_HEADER + "']"));
}
if (producerProperties.getExtension().isSync()) {
setSync(true);

View File

@@ -73,7 +73,7 @@ import org.springframework.util.ObjectUtils;
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({ KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class})
@EnableConfigurationProperties({ KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class })
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
@@ -97,14 +97,20 @@ public class KafkaBinderConfiguration {
private AdminUtilsOperation adminUtilsOperation;
@Bean
KafkaTopicProvisioner provisioningProvider() {
return new KafkaTopicProvisioner(this.configurationProperties, this.adminUtilsOperation);
KafkaBinderConfigurationProperties configurationProperties() {
return new KafkaBinderConfigurationProperties();
}
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder() {
KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaTopicProvisioner(configurationProperties, this.adminUtilsOperation);
}
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
this.configurationProperties, provisioningProvider());
configurationProperties, provisioningProvider);
kafkaMessageChannelBinder.setCodec(this.codec);
kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
@@ -118,7 +124,8 @@ public class KafkaBinderConfiguration {
}
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
@@ -126,17 +133,18 @@ public class KafkaBinderConfiguration {
props.putAll(configurationProperties.getConsumerConfiguration());
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
consumerFactory);
indicator.setTimeout(this.configurationProperties.getHealthTimeout());
indicator.setTimeout(configurationProperties.getHealthTimeout());
return indicator;
}
@Bean
public PublicMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
public PublicMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties);
}

View File

@@ -16,6 +16,12 @@
package org.springframework.cloud.stream.binder.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
@@ -85,11 +91,6 @@ import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
@@ -98,6 +99,7 @@ import kafka.utils.ZkUtils;
* @author Ilayaperumal Gopinathan
* @author Henryk Konsek
* @author Gary Russell
* @author Aldo Sinanaj
*/
public abstract class KafkaBinderTests extends
PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
@@ -141,15 +143,20 @@ public abstract class KafkaBinderTests extends
@Test
public void testDlqAndRetry() throws Exception {
testDlqGuts(true);
testDlqGuts(true, false);
}
@Test
public void testDlq() throws Exception {
testDlqGuts(false);
testDlqGuts(false, false);
}
private void testDlqGuts(boolean withRetry) throws Exception {
@Test
public void testDlqRawHeaders() throws Exception {
testDlqGuts(false, true);
}
private void testDlqGuts(boolean withRetry, boolean raw) throws Exception {
AbstractKafkaTestBinder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
DirectChannel moduleInputChannel = new DirectChannel();
@@ -158,12 +165,18 @@ public abstract class KafkaBinderTests extends
moduleInputChannel.subscribe(handler);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(2);
if (raw) {
producerProperties.setHeaderMode(HeaderMode.raw);
}
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(withRetry ? 2 : 1);
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
consumerProperties.getExtension().setEnableDlq(true);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
if (raw) {
consumerProperties.setHeaderMode(HeaderMode.raw);
}
long uniqueBindingId = System.currentTimeMillis();
String producerName = "dlqTest." + uniqueBindingId + ".0";
@@ -174,6 +187,9 @@ public abstract class KafkaBinderTests extends
ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
dlqConsumerProperties.setMaxAttempts(1);
if (raw) {
dlqConsumerProperties.setHeaderMode(HeaderMode.raw);
}
ApplicationContext context = TestUtils.getPropertyValue(binder.getBinder(), "applicationContext",
ApplicationContext.class);
@@ -206,13 +222,28 @@ public abstract class KafkaBinderTests extends
"error.dlqTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties);
binderBindUnbindLatency();
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload)
.setHeader("dlqTestHeader", "propagatedToDlq")
.build();
moduleOutputChannel.send(testMessage);
Message<?> receivedMessage = receive(dlqChannel, 3);
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload);
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
if (raw) {
assertThat(new String((byte[]) receivedMessage.getPayload(), StandardCharsets.UTF_8))
.isEqualTo(testMessagePayload);
}
else {
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload);
final MessageHeaders headers = receivedMessage.getHeaders();
assertThat(headers.get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC)).isEqualTo(producerName);
assertThat(headers.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))
.isEqualTo("failed to send Message to channel 'null'; nested exception is java.lang.RuntimeException: fail");
assertThat(headers.get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE)).isNotNull();
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isNotNull();
assertThat(headers.get("dlqTestHeader")).isEqualTo("propagatedToDlq");
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
}
binderBindUnbindLatency();
// verify we got a message on the dedicated error channel and the global (via bridge)

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.3.RELEASE</version>
</parent>
<dependencies>