Compare commits

...

4 Commits

Author SHA1 Message Date
Soby Chacko
eec0eb7892 Upgrade to 1.3.1.RELEASE 2017-12-27 11:36:47 -05:00
Soby Chacko
d639731c66 Remove wildcard imports 2017-12-26 17:22:27 -05:00
Soby Chacko
49b780717e Backport DLQ releated test changes
backport missing dlq related test configuration changes
from 0.10.2 test binder to 0.10.1 test binder
2017-12-26 17:22:27 -05:00
Aldo Sinanaj
f4fc9442c7 Enhance DLQ messages with failure info
Polishing
2017-11-20 11:16:05 -05:00
12 changed files with 90 additions and 30 deletions

View File

@@ -2,12 +2,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.1.RELEASE</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.3.5.RELEASE</version>
<version>1.3.7.RELEASE</version>
<relativePath />
</parent>
<properties>
@@ -15,8 +15,8 @@
<kafka.version>0.10.1.1</kafka.version>
<spring-kafka.version>1.1.6.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.1.2.RELEASE</spring-integration-kafka.version>
<spring-cloud-stream.version>1.3.1.BUILD-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-build.version>1.3.5.RELEASE</spring-cloud-build.version>
<spring-cloud-stream.version>1.3.1.RELEASE</spring-cloud-stream.version>
<spring-cloud-build.version>1.3.7.RELEASE</spring-cloud-build.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.1-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.1 Tests</description>

View File

@@ -16,13 +16,6 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import kafka.utils.ZKStringSerializer$;
@@ -36,7 +29,6 @@ import org.eclipse.jetty.server.Server;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -58,11 +50,18 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import static org.junit.Assert.assertTrue;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
*
* This test specifically tests for the 0.10.1.x version of Kafka.
*
* @author Eric Bottard
@@ -79,7 +78,7 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
private Kafka10TestBinder binder;
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
private final Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
@@ -90,11 +89,13 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
protected Kafka10TestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setHeaders("dlqTestHeader");
binder = new Kafka10TestBinder(binderConfiguration);
}
return binder;
}
@Override
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.2-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.2 Tests</description>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 the original author or authors.
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,8 +25,10 @@ import java.util.UUID;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -69,6 +71,7 @@ import static org.junit.Assert.assertTrue;
* @author Marius Bogoevici
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
* @author Gary Russell
*/
public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
@@ -79,7 +82,7 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
private Kafka10TestBinder binder;
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
private final Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
@@ -90,11 +93,13 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
protected Kafka10TestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setHeaders("dlqTestHeader");
binder = new Kafka10TestBinder(binderConfiguration);
}
return binder;
}
@Override
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.1.RELEASE</version>
</parent>
<dependencies>

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.binder.kafka;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -39,9 +41,11 @@ import org.apache.kafka.common.utils.Utils;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
@@ -71,6 +75,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
@@ -90,12 +95,19 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
* @author Soby Chacko
* @author Henryk Konsek
* @author Doug Saus
* @author Aldo Sinanaj
*/
public class KafkaMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
public static final String X_ORIGINAL_TOPIC = "x-original-topic";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
private final KafkaBinderConfigurationProperties configurationProperties;
private final Map<String, TopicInformation> topicsInUse = new HashMap<>();
@@ -343,10 +355,35 @@ public class KafkaMessageChannelBinder extends
public void handleMessage(Message<?> message) throws MessagingException {
final ConsumerRecord<?, ?> record = message.getHeaders()
.get(KafkaMessageDrivenChannelAdapter.KAFKA_RAW_DATA, ConsumerRecord.class);
final byte[] key = record.key() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
final byte[] key = record.key() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
: null;
final byte[] payload = record.value() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
final byte[] payload;
if (message.getPayload() instanceof Throwable) {
final Throwable throwable = (Throwable) message.getPayload();
final String failureMessage = throwable.getMessage();
try {
MessageValues messageValues = EmbeddedHeaderUtils
.extractHeaders(MessageBuilder.withPayload((byte[]) record.value()).build(),
false);
messageValues.put(X_ORIGINAL_TOPIC, record.topic());
messageValues.put(X_EXCEPTION_MESSAGE, failureMessage);
messageValues.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(throwable));
final String[] headersToEmbed = new ArrayList<>(messageValues.keySet()).toArray(
new String[messageValues.keySet().size()]);
payload = EmbeddedHeaderUtils.embedHeaders(messageValues,
EmbeddedHeaderUtils.headersToEmbed(headersToEmbed));
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
else {
payload = record.value() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
}
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
? extendedConsumerProperties.getExtension().getDlqName()
: "error." + destination.getName() + "." + group;
@@ -434,6 +471,13 @@ public class KafkaMessageChannelBinder extends
return original.substring(0, maxCharacters) + "...";
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
private final class ProducerConfigurationMessageHandler extends KafkaProducerMessageHandler<byte[], byte[]>
implements Lifecycle {

View File

@@ -27,6 +27,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -90,14 +93,12 @@ import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
/**
* @author Soby Chacko
* @author Ilayaperumal Gopinathan
* @author Henryk Konsek
* @author Gary Russell
* @author Aldo Sinanaj
*/
public abstract class KafkaBinderTests extends
PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
@@ -206,12 +207,21 @@ public abstract class KafkaBinderTests extends
"error.dlqTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties);
binderBindUnbindLatency();
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload)
.setHeader("dlqTestHeader", "propagatedToDlq")
.build();
moduleOutputChannel.send(testMessage);
Message<?> receivedMessage = receive(dlqChannel, 3);
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload);
final MessageHeaders headers = receivedMessage.getHeaders();
assertThat(headers.get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC)).isEqualTo(producerName);
assertThat(headers.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))
.isEqualTo("failed to send Message to channel 'null'; nested exception is java.lang.RuntimeException: fail");
assertThat(headers.get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE)).isNotNull();
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isNotNull();
assertThat(headers.get("dlqTestHeader")).isEqualTo("propagatedToDlq");
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
binderBindUnbindLatency();

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<version>1.3.1.RELEASE</version>
</parent>
<dependencies>