Compare commits

..

21 Commits

Author SHA1 Message Date
Soby Chacko
6d5c847c47 1.3.3.RELEASE 2018-06-12 13:22:16 -04:00
Gary Russell
6944ca7212 Fix default partition header expression 2018-06-11 17:13:46 -04:00
Soby Chacko
52d879b2cc KafkaBinderConfigurationProperties duplicate bean
ConfigurationProperties bean provided by Kafka Streams binder
extends from `KafkaBinderConfigurationProperties` used by Kafka binder.
It creates a conflict when autowiring this bean from Kafka binder configuration.
This prevents an application to have both binders in the classpath.
Change the creation of this ConfigurationProperties bean so that it
avoids creating bean using EnbaleConfigurationProperties and then autowiring,
but directly create the Bean using `@Bean`. This prevents the conflict.

Resolves #244
Resolves #315
2018-06-11 17:12:07 -04:00
Gary Russell
e982d60cd3 Add test case for previous commit 2018-04-02 12:20:48 -04:00
Aldo Sinanaj
fe571133fb Fix DLQ and Embedded Headers for Raw Mode
Enhance the error message headers before sending to DLQ only for consumer with embedded header mode
2018-04-02 11:56:20 -04:00
Soby Chacko
4cc183396d Update ScSt version to 1.3.3.BUILD-SNAPSHOT 2018-01-10 15:39:32 -05:00
Soby Chacko
ab87c79f7f 1.3.3.BUILD-SNAPSHOT 2018-01-10 14:47:36 -05:00
Soby Chacko
079bd6f7c4 1.3.2.RELEASE 2018-01-10 14:23:25 -05:00
Soby Chacko
971b78936b Next update version: 1.3.2.BUILD-SNAPSHOT 2017-12-27 12:32:36 -05:00
Soby Chacko
eec0eb7892 Upgrade to 1.3.1.RELEASE 2017-12-27 11:36:47 -05:00
Soby Chacko
d639731c66 Remove wildcard imports 2017-12-26 17:22:27 -05:00
Soby Chacko
49b780717e Backport DLQ releated test changes
backport missing dlq related test configuration changes
from 0.10.2 test binder to 0.10.1 test binder
2017-12-26 17:22:27 -05:00
Aldo Sinanaj
f4fc9442c7 Enhance DLQ messages with failure info
Polishing
2017-11-20 11:16:05 -05:00
Gary Russell
e3460d6fce Update POMs to 1.3.1.BUILD-SNAPSHOT 2017-09-29 16:25:08 -04:00
Gary Russell
29bb8513c0 Update POMs to 1.3.0.RELEASE 2017-09-29 15:57:53 -04:00
Gary Russell
69227166c7 GH-215: Add timeout to health indicator
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/215

* Shutdown the executor.

* Polishing - PR Comments

* Re-interrupt thread.

* More Polishing
2017-09-29 14:47:45 -04:00
Gary Russell
4ff4507741 GH-206: Close Consumer/Producer in provisioning
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/206

Close the consumer and producer after retrieving the current partition count.

**Cherry pick/back port to 0.11 and 1.2.x, 2.0.x**

* Destroy the Producer Factory
2017-09-28 10:51:15 -04:00
Soby Chacko
f2e1b63460 GH-201: Move metrics doc to the main overview doc
Fixes #201
2017-09-27 12:57:19 -04:00
Soby Chacko
73f1ed9523 Separate each sentence in metrics docs 2017-09-26 20:02:05 -04:00
Soby Chacko
dc7662e17d Add missing documentation for windowing properties
Cleanup test

Fix #196
2017-09-26 16:31:42 -04:00
Gary Russell
b76fff31b8 Back to 1.3.0.BUILD-SNAPSHOT 2017-09-13 11:10:07 -04:00
20 changed files with 311 additions and 84 deletions

View File

@@ -2,12 +2,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.RC1</version>
<version>1.3.3.RELEASE</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.3.5.RELEASE</version>
<version>1.3.9.RELEASE</version>
<relativePath />
</parent>
<properties>
@@ -15,8 +15,8 @@
<kafka.version>0.10.1.1</kafka.version>
<spring-kafka.version>1.1.6.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.1.2.RELEASE</spring-integration-kafka.version>
<spring-cloud-stream.version>1.3.0.RC1</spring-cloud-stream.version>
<spring-cloud-build.version>1.3.5.RELEASE</spring-cloud-build.version>
<spring-cloud-stream.version>1.3.3.RELEASE</spring-cloud-stream.version>
<spring-cloud-build.version>1.3.9.RELEASE</spring-cloud-build.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.RC1</version>
<version>1.3.3.RELEASE</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.RC1</version>
<version>1.3.3.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.1-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.1 Tests</description>

View File

@@ -16,13 +16,6 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import kafka.utils.ZKStringSerializer$;
@@ -36,7 +29,6 @@ import org.eclipse.jetty.server.Server;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -58,11 +50,18 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import static org.junit.Assert.assertTrue;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
*
* This test specifically tests for the 0.10.1.x version of Kafka.
*
* @author Eric Bottard
@@ -79,7 +78,7 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
private Kafka10TestBinder binder;
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
private final Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
@@ -90,11 +89,13 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
protected Kafka10TestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setHeaders("dlqTestHeader");
binder = new Kafka10TestBinder(binderConfiguration);
}
return binder;
}
@Override
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.RC1</version>
<version>1.3.3.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.2-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.2 Tests</description>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 the original author or authors.
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,8 +25,10 @@ import java.util.UUID;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -69,6 +71,7 @@ import static org.junit.Assert.assertTrue;
* @author Marius Bogoevici
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
* @author Gary Russell
*/
public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
@@ -79,7 +82,7 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
private Kafka10TestBinder binder;
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
private final Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
@@ -90,11 +93,13 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
protected Kafka10TestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setHeaders("dlqTestHeader");
binder = new Kafka10TestBinder(binderConfiguration);
}
return binder;
}
@Override
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.RC1</version>
<version>1.3.3.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@ import org.springframework.util.StringUtils;
* @author Ilayaperumal Gopinathan
* @author Marius Bogoevici
* @author Soby Chacko
* @author Gary Russell
*/
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder")
public class KafkaBinderConfigurationProperties {
@@ -88,6 +89,11 @@ public class KafkaBinderConfigurationProperties {
private int queueSize = 8192;
/**
* Time to wait to get partition information in seconds; default 60.
*/
private int healthTimeout = 60;
private JaasLoginModuleConfiguration jaas;
public String getZkConnectionString() {
@@ -228,6 +234,14 @@ public class KafkaBinderConfigurationProperties {
this.minPartitionCount = minPartitionCount;
}
public int getHealthTimeout() {
return this.healthTimeout;
}
public void setHealthTimeout(int healthTimeout) {
this.healthTimeout = healthTimeout;
}
public int getQueueSize() {
return this.queueSize;
}

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.RC1</version>
<version>1.3.3.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>

View File

@@ -26,8 +26,6 @@ include::overview.adoc[]
include::dlq.adoc[]
include::metrics.adoc[]
= Appendices
[appendix]
include::building.adoc[]

View File

@@ -1,10 +0,0 @@
[[kafka-metrics]]
== Kafka metrics
Kafka binder module exposes the following metrics:
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag` - this metric indicates how many messages
have not been yet consumed from given binder's topic (`someTopic`) by given consumer group (`someGroup`).
For example if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, then
consumer group `myGroup` has `1000` messages to waiting to be consumed from topic `myTopic`. This metric is
particularly useful to provide auto-scaling feedback to PaaS platform of your choice.

View File

@@ -73,6 +73,11 @@ spring.cloud.stream.kafka.binder.headers::
The list of custom headers that will be transported by the binder.
+
Default: empty.
spring.cloud.stream.kafka.binder.healthTimeout::
The time to wait to get partition information in seconds; default 60.
Health will report as down if this timer expires.
+
Default: 10.
spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow::
The frequency, in milliseconds, with which offsets are saved.
Ignored if `0`.
@@ -523,6 +528,34 @@ spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.c
spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde
----
timewindow.length::
Many streaming applications written using Kafka Streams involve windowning operations.
If you specify this property, there is a `org.apache.kafka.streams.kstream.TimeWindows` bean automatically provided that can be autowired in applications.
This property must be prefixed with `spring.cloud.stream.kstream.`.
A bean of type `org.apache.kafka.streams.kstream.TimeWindows` is created only if this property is provided.
Following is an example of using this property.
Values are provided in milliseconds.
[source]
----
spring.cloud.stream.kstream.timeWindow.length=5000
----
timewindow.advanceBy::
This property goes hand in hand with `timewindow.length` and has no effect on its own.
If you provide this property, the generated `org.apache.kafka.streams.kstream.TimeWindows` bean will automatically conatin this information.
This property must be prefixed with `spring.cloud.stream.kstream.`.
Following is an example of using this property.
Values are provided in milliseconds.
[source]
----
spring.cloud.stream.kstream.timeWindow.advanceBy=1000
----
[[kafka-error-channels]]
== Error Channels
@@ -535,3 +568,13 @@ The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureExcep
* `record` - the raw `ProducerRecord` that was created from the `failedMessage`
There is no automatic handling of these exceptions (such as sending to a <<kafka-dlq-processing, Dead-Letter queue>>); you can consume these exceptions with your own Spring Integration flow.
[[kafka-metrics]]
== Kafka Metrics
Kafka binder module exposes the following metrics:
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag` - this metric indicates how many messages have not been yet consumed from given binder's topic by given consumer group.
For example if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, then consumer group `myGroup` has `1000` messages to waiting to be consumed from topic `myTopic`.
This metric is particularly useful to provide auto-scaling feedback to PaaS platform of your choice.

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.RC1</version>
<version>1.3.3.RELEASE</version>
</parent>
<dependencies>

View File

@@ -19,6 +19,13 @@ package org.springframework.cloud.stream.binder.kafka;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
@@ -33,13 +40,18 @@ import org.springframework.kafka.core.ConsumerFactory;
* @author Ilayaperumal Gopinathan
* @author Marius Bogoevici
* @author Henryk Konsek
* @author Gary Russell
*/
public class KafkaBinderHealthIndicator implements HealthIndicator {
private static final int DEFAULT_TIMEOUT = 60;
private final KafkaMessageChannelBinder binder;
private final ConsumerFactory<?, ?> consumerFactory;
private int timeout = DEFAULT_TIMEOUT;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
ConsumerFactory<?, ?> consumerFactory) {
this.binder = binder;
@@ -47,28 +59,67 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
}
/**
* Set the timeout in seconds to retrieve health information.
* @param timeout the timeout - default 60.
*/
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public Health health() {
try (Consumer<?, ?> metadataConsumer = consumerFactory.createConsumer()) {
Set<String> downMessages = new HashSet<>();
for (String topic : this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (this.binder.getTopicsInUse().get(topic).getPartitionInfos().contains(partitionInfo)
&& partitionInfo.leader()
.id() == -1) {
downMessages.add(partitionInfo.toString());
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Health> future = exec.submit(new Callable<Health>() {
@Override
public Health call() {
try (Consumer<?, ?> metadataConsumer = consumerFactory.createConsumer()) {
Set<String> downMessages = new HashSet<>();
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
downMessages.add(partitionInfo.toString());
}
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
else {
return Health.down()
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
.build();
}
}
catch (Exception e) {
return Health.down(e).build();
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
return Health.down().withDetail("Following partitions in use have no leaders: ", downMessages.toString())
});
try {
return future.get(this.timeout, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Health.down()
.withDetail("Interrupted while waiting for partition information in", this.timeout + " seconds")
.build();
}
catch (Exception e) {
catch (ExecutionException e) {
return Health.down(e).build();
}
catch (TimeoutException e) {
return Health.down()
.withDetail("Failed to retrieve partition information in", this.timeout + " seconds")
.build();
}
finally {
exec.shutdownNow();
}
}
}

View File

@@ -16,17 +16,22 @@
package org.springframework.cloud.stream.binder.kafka;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -36,9 +41,12 @@ import org.apache.kafka.common.utils.Utils;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
@@ -68,6 +76,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
@@ -87,12 +96,19 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
* @author Soby Chacko
* @author Henryk Konsek
* @author Doug Saus
* @author Aldo Sinanaj
*/
public class KafkaMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
public static final String X_ORIGINAL_TOPIC = "x-original-topic";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
private final KafkaBinderConfigurationProperties configurationProperties;
private final Map<String, TopicInformation> topicsInUse = new HashMap<>();
@@ -154,10 +170,16 @@ public class KafkaMessageChannelBinder extends
producerProperties.getPartitionCount(),
false,
new Callable<Collection<PartitionInfo>>() {
@Override
public Collection<PartitionInfo> call() throws Exception {
return producerFB.createProducer().partitionsFor(destination.getName());
Producer<byte[], byte[]> producer = producerFB.createProducer();
List<PartitionInfo> partitionsFor = producer.partitionsFor(destination.getName());
producer.close();
producerFB.destroy();
return partitionsFor;
}
});
this.topicsInUse.put(destination.getName(), new TopicInformation(null, partitions));
if (producerProperties.getPartitionCount() < partitions.size()) {
@@ -238,10 +260,15 @@ public class KafkaMessageChannelBinder extends
Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
new Callable<Collection<PartitionInfo>>() {
@Override
public Collection<PartitionInfo> call() throws Exception {
return consumerFactory.createConsumer().partitionsFor(destination.getName());
Consumer<?, ?> consumer = consumerFactory.createConsumer();
List<PartitionInfo> partitionsFor = consumer.partitionsFor(destination.getName());
consumer.close();
return partitionsFor;
}
});
Collection<PartitionInfo> listenedPartitions;
@@ -329,10 +356,36 @@ public class KafkaMessageChannelBinder extends
public void handleMessage(Message<?> message) throws MessagingException {
final ConsumerRecord<?, ?> record = message.getHeaders()
.get(KafkaMessageDrivenChannelAdapter.KAFKA_RAW_DATA, ConsumerRecord.class);
final byte[] key = record.key() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
final byte[] key = record.key() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
: null;
final byte[] payload = record.value() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
final byte[] payload;
if (HeaderMode.embeddedHeaders == extendedConsumerProperties.getHeaderMode()
&& message.getPayload() instanceof Throwable) {
final Throwable throwable = (Throwable) message.getPayload();
final String failureMessage = throwable.getMessage();
try {
MessageValues messageValues = EmbeddedHeaderUtils
.extractHeaders(MessageBuilder.withPayload((byte[]) record.value()).build(),
false);
messageValues.put(X_ORIGINAL_TOPIC, record.topic());
messageValues.put(X_EXCEPTION_MESSAGE, failureMessage);
messageValues.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(throwable));
final String[] headersToEmbed = new ArrayList<>(messageValues.keySet()).toArray(
new String[messageValues.keySet().size()]);
payload = EmbeddedHeaderUtils.embedHeaders(messageValues,
EmbeddedHeaderUtils.headersToEmbed(headersToEmbed));
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
else {
payload = record.value() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
}
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
? extendedConsumerProperties.getExtension().getDlqName()
: "error." + destination.getName() + "." + group;
@@ -420,6 +473,13 @@ public class KafkaMessageChannelBinder extends
return original.substring(0, maxCharacters) + "...";
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
private final class ProducerConfigurationMessageHandler extends KafkaProducerMessageHandler<byte[], byte[]>
implements Lifecycle {
@@ -436,7 +496,7 @@ public class KafkaMessageChannelBinder extends
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
if (producerProperties.isPartitioned()) {
SpelExpressionParser parser = new SpelExpressionParser();
setPartitionIdExpression(parser.parseExpression("headers." + BinderHeaders.PARTITION_HEADER));
setPartitionIdExpression(parser.parseExpression("headers['" + BinderHeaders.PARTITION_HEADER + "']"));
}
if (producerProperties.getExtension().isSync()) {
setSync(true);

View File

@@ -68,11 +68,12 @@ import org.springframework.util.ObjectUtils;
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
* @author Henryk Konsek
* @author Gary Russell
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({ KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class})
@EnableConfigurationProperties({ KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class })
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
@@ -96,14 +97,20 @@ public class KafkaBinderConfiguration {
private AdminUtilsOperation adminUtilsOperation;
@Bean
KafkaTopicProvisioner provisioningProvider() {
return new KafkaTopicProvisioner(this.configurationProperties, this.adminUtilsOperation);
KafkaBinderConfigurationProperties configurationProperties() {
return new KafkaBinderConfigurationProperties();
}
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder() {
KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaTopicProvisioner(configurationProperties, this.adminUtilsOperation);
}
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
this.configurationProperties, provisioningProvider());
configurationProperties, provisioningProvider);
kafkaMessageChannelBinder.setCodec(this.codec);
kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
@@ -117,7 +124,8 @@ public class KafkaBinderConfiguration {
}
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
@@ -125,14 +133,18 @@ public class KafkaBinderConfiguration {
props.putAll(configurationProperties.getConsumerConfiguration());
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
consumerFactory);
indicator.setTimeout(configurationProperties.getHealthTimeout());
return indicator;
}
@Bean
public PublicMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
public PublicMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties);
}

View File

@@ -15,6 +15,10 @@
*/
package org.springframework.cloud.stream.binder.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -27,16 +31,16 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
/**
* @author Barry Commins
* @author Gary Russell
*/
public class KafkaBinderHealthIndicatorTest {
@@ -53,14 +57,15 @@ public class KafkaBinderHealthIndicatorTest {
@Mock
private KafkaMessageChannelBinder binder;
private Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = new HashMap<>();
private final Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = new HashMap<>();
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
given(consumerFactory.createConsumer()).willReturn(consumer);
given(binder.getTopicsInUse()).willReturn(topicsInUse);
indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
this.indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
this.indicator.setTimeout(10);
}
@Test
@@ -70,6 +75,7 @@ public class KafkaBinderHealthIndicatorTest {
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
verify(this.consumer).close();
}
@Test
@@ -81,6 +87,25 @@ public class KafkaBinderHealthIndicatorTest {
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
@Test(timeout = 5000)
public void kafkaBinderDoesNotAnswer() {
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
given(consumer.partitionsFor(TEST_TOPIC)).willAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final int fiveMinutes = 1000 * 60 * 5;
Thread.sleep(fiveMinutes);
return partitions;
}
});
this.indicator.setTimeout(1);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
private List<PartitionInfo> partitions(Node leader) {
List<PartitionInfo> partitions = new ArrayList<>();
partitions.add(new PartitionInfo(TEST_TOPIC, 0, leader, null, null));

View File

@@ -16,6 +16,12 @@
package org.springframework.cloud.stream.binder.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
@@ -85,11 +91,6 @@ import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
@@ -98,6 +99,7 @@ import kafka.utils.ZkUtils;
* @author Ilayaperumal Gopinathan
* @author Henryk Konsek
* @author Gary Russell
* @author Aldo Sinanaj
*/
public abstract class KafkaBinderTests extends
PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
@@ -141,15 +143,20 @@ public abstract class KafkaBinderTests extends
@Test
public void testDlqAndRetry() throws Exception {
testDlqGuts(true);
testDlqGuts(true, false);
}
@Test
public void testDlq() throws Exception {
testDlqGuts(false);
testDlqGuts(false, false);
}
private void testDlqGuts(boolean withRetry) throws Exception {
@Test
public void testDlqRawHeaders() throws Exception {
testDlqGuts(false, true);
}
private void testDlqGuts(boolean withRetry, boolean raw) throws Exception {
AbstractKafkaTestBinder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
DirectChannel moduleInputChannel = new DirectChannel();
@@ -158,12 +165,18 @@ public abstract class KafkaBinderTests extends
moduleInputChannel.subscribe(handler);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(2);
if (raw) {
producerProperties.setHeaderMode(HeaderMode.raw);
}
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(withRetry ? 2 : 1);
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
consumerProperties.getExtension().setEnableDlq(true);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
if (raw) {
consumerProperties.setHeaderMode(HeaderMode.raw);
}
long uniqueBindingId = System.currentTimeMillis();
String producerName = "dlqTest." + uniqueBindingId + ".0";
@@ -174,6 +187,9 @@ public abstract class KafkaBinderTests extends
ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
dlqConsumerProperties.setMaxAttempts(1);
if (raw) {
dlqConsumerProperties.setHeaderMode(HeaderMode.raw);
}
ApplicationContext context = TestUtils.getPropertyValue(binder.getBinder(), "applicationContext",
ApplicationContext.class);
@@ -206,13 +222,28 @@ public abstract class KafkaBinderTests extends
"error.dlqTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties);
binderBindUnbindLatency();
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload)
.setHeader("dlqTestHeader", "propagatedToDlq")
.build();
moduleOutputChannel.send(testMessage);
Message<?> receivedMessage = receive(dlqChannel, 3);
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload);
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
if (raw) {
assertThat(new String((byte[]) receivedMessage.getPayload(), StandardCharsets.UTF_8))
.isEqualTo(testMessagePayload);
}
else {
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload);
final MessageHeaders headers = receivedMessage.getHeaders();
assertThat(headers.get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC)).isEqualTo(producerName);
assertThat(headers.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))
.isEqualTo("failed to send Message to channel 'null'; nested exception is java.lang.RuntimeException: fail");
assertThat(headers.get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE)).isNotNull();
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isNotNull();
assertThat(headers.get("dlqTestHeader")).isEqualTo("propagatedToDlq");
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
}
binderBindUnbindLatency();
// verify we got a message on the dedicated error channel and the global (via bridge)

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.RC1</version>
<version>1.3.3.RELEASE</version>
</parent>
<dependencies>

View File

@@ -39,11 +39,9 @@ import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
import org.springframework.cloud.stream.binder.kstream.config.KStreamApplicationSupportProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -115,7 +113,6 @@ public class KStreamBinderWordCountIntegrationTests {
@EnableBinding(KStreamProcessor.class)
@EnableAutoConfiguration
@EnableConfigurationProperties(KStreamApplicationSupportProperties.class)
public static class WordCountProcessorApplication {
@Autowired