Compare commits
14 Commits
v1.3.0.RC1
...
v1.3.2.REL
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
079bd6f7c4 | ||
|
|
971b78936b | ||
|
|
eec0eb7892 | ||
|
|
d639731c66 | ||
|
|
49b780717e | ||
|
|
f4fc9442c7 | ||
|
|
e3460d6fce | ||
|
|
29bb8513c0 | ||
|
|
69227166c7 | ||
|
|
4ff4507741 | ||
|
|
f2e1b63460 | ||
|
|
73f1ed9523 | ||
|
|
dc7662e17d | ||
|
|
b76fff31b8 |
8
pom.xml
8
pom.xml
@@ -2,12 +2,12 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.3.0.RC1</version>
|
||||
<version>1.3.2.RELEASE</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>1.3.5.RELEASE</version>
|
||||
<version>1.3.7.RELEASE</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
@@ -15,8 +15,8 @@
|
||||
<kafka.version>0.10.1.1</kafka.version>
|
||||
<spring-kafka.version>1.1.6.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>2.1.2.RELEASE</spring-integration-kafka.version>
|
||||
<spring-cloud-stream.version>1.3.0.RC1</spring-cloud-stream.version>
|
||||
<spring-cloud-build.version>1.3.5.RELEASE</spring-cloud-build.version>
|
||||
<spring-cloud-stream.version>1.3.2.RELEASE</spring-cloud-stream.version>
|
||||
<spring-cloud-build.version>1.3.7.RELEASE</spring-cloud-build.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.3.0.RC1</version>
|
||||
<version>1.3.2.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.3.0.RC1</version>
|
||||
<version>1.3.2.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-0.10.1-test</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder 0.10.1 Tests</description>
|
||||
|
||||
@@ -16,13 +16,6 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
|
||||
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
@@ -36,7 +29,6 @@ import org.eclipse.jetty.server.Server;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.Binding;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
@@ -58,11 +50,18 @@ import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Integration tests for the {@link KafkaMessageChannelBinder}.
|
||||
*
|
||||
*
|
||||
* This test specifically tests for the 0.10.1.x version of Kafka.
|
||||
*
|
||||
* @author Eric Bottard
|
||||
@@ -79,7 +78,7 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
|
||||
|
||||
private Kafka10TestBinder binder;
|
||||
|
||||
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
|
||||
private final Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
@@ -90,11 +89,13 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
|
||||
protected Kafka10TestBinder getBinder() {
|
||||
if (binder == null) {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binderConfiguration.setHeaders("dlqTestHeader");
|
||||
binder = new Kafka10TestBinder(binderConfiguration);
|
||||
}
|
||||
return binder;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
|
||||
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.3.0.RC1</version>
|
||||
<version>1.3.2.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-0.10.2-test</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder 0.10.2 Tests</description>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2014-2016 the original author or authors.
|
||||
* Copyright 2014-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -25,8 +25,10 @@ import java.util.UUID;
|
||||
|
||||
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
|
||||
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
|
||||
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
@@ -69,6 +71,7 @@ import static org.junit.Assert.assertTrue;
|
||||
* @author Marius Bogoevici
|
||||
* @author Mark Fisher
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
|
||||
|
||||
@@ -79,7 +82,7 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
|
||||
|
||||
private Kafka10TestBinder binder;
|
||||
|
||||
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
|
||||
private final Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
|
||||
|
||||
@Override
|
||||
protected void binderBindUnbindLatency() throws InterruptedException {
|
||||
@@ -90,11 +93,13 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
|
||||
protected Kafka10TestBinder getBinder() {
|
||||
if (binder == null) {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
|
||||
binderConfiguration.setHeaders("dlqTestHeader");
|
||||
binder = new Kafka10TestBinder(binderConfiguration);
|
||||
}
|
||||
return binder;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
|
||||
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.3.0.RC1</version>
|
||||
<version>1.3.2.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015-2016 the original author or authors.
|
||||
* Copyright 2015-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -35,6 +35,7 @@ import org.springframework.util.StringUtils;
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Marius Bogoevici
|
||||
* @author Soby Chacko
|
||||
* @author Gary Russell
|
||||
*/
|
||||
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder")
|
||||
public class KafkaBinderConfigurationProperties {
|
||||
@@ -88,6 +89,11 @@ public class KafkaBinderConfigurationProperties {
|
||||
|
||||
private int queueSize = 8192;
|
||||
|
||||
/**
|
||||
* Time to wait to get partition information in seconds; default 60.
|
||||
*/
|
||||
private int healthTimeout = 60;
|
||||
|
||||
private JaasLoginModuleConfiguration jaas;
|
||||
|
||||
public String getZkConnectionString() {
|
||||
@@ -228,6 +234,14 @@ public class KafkaBinderConfigurationProperties {
|
||||
this.minPartitionCount = minPartitionCount;
|
||||
}
|
||||
|
||||
public int getHealthTimeout() {
|
||||
return this.healthTimeout;
|
||||
}
|
||||
|
||||
public void setHealthTimeout(int healthTimeout) {
|
||||
this.healthTimeout = healthTimeout;
|
||||
}
|
||||
|
||||
public int getQueueSize() {
|
||||
return this.queueSize;
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.3.0.RC1</version>
|
||||
<version>1.3.2.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
|
||||
|
||||
@@ -26,8 +26,6 @@ include::overview.adoc[]
|
||||
|
||||
include::dlq.adoc[]
|
||||
|
||||
include::metrics.adoc[]
|
||||
|
||||
= Appendices
|
||||
[appendix]
|
||||
include::building.adoc[]
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
[[kafka-metrics]]
|
||||
== Kafka metrics
|
||||
|
||||
Kafka binder module exposes the following metrics:
|
||||
|
||||
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag` - this metric indicates how many messages
|
||||
have not been yet consumed from given binder's topic (`someTopic`) by given consumer group (`someGroup`).
|
||||
For example if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, then
|
||||
consumer group `myGroup` has `1000` messages to waiting to be consumed from topic `myTopic`. This metric is
|
||||
particularly useful to provide auto-scaling feedback to PaaS platform of your choice.
|
||||
@@ -73,6 +73,11 @@ spring.cloud.stream.kafka.binder.headers::
|
||||
The list of custom headers that will be transported by the binder.
|
||||
+
|
||||
Default: empty.
|
||||
spring.cloud.stream.kafka.binder.healthTimeout::
|
||||
The time to wait to get partition information in seconds; default 60.
|
||||
Health will report as down if this timer expires.
|
||||
+
|
||||
Default: 10.
|
||||
spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow::
|
||||
The frequency, in milliseconds, with which offsets are saved.
|
||||
Ignored if `0`.
|
||||
@@ -523,6 +528,34 @@ spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.c
|
||||
spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde
|
||||
----
|
||||
|
||||
timewindow.length::
|
||||
Many streaming applications written using Kafka Streams involve windowning operations.
|
||||
If you specify this property, there is a `org.apache.kafka.streams.kstream.TimeWindows` bean automatically provided that can be autowired in applications.
|
||||
This property must be prefixed with `spring.cloud.stream.kstream.`.
|
||||
A bean of type `org.apache.kafka.streams.kstream.TimeWindows` is created only if this property is provided.
|
||||
|
||||
Following is an example of using this property.
|
||||
Values are provided in milliseconds.
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kstream.timeWindow.length=5000
|
||||
----
|
||||
|
||||
timewindow.advanceBy::
|
||||
This property goes hand in hand with `timewindow.length` and has no effect on its own.
|
||||
If you provide this property, the generated `org.apache.kafka.streams.kstream.TimeWindows` bean will automatically conatin this information.
|
||||
This property must be prefixed with `spring.cloud.stream.kstream.`.
|
||||
|
||||
Following is an example of using this property.
|
||||
Values are provided in milliseconds.
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kstream.timeWindow.advanceBy=1000
|
||||
----
|
||||
|
||||
|
||||
[[kafka-error-channels]]
|
||||
== Error Channels
|
||||
|
||||
@@ -535,3 +568,13 @@ The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureExcep
|
||||
* `record` - the raw `ProducerRecord` that was created from the `failedMessage`
|
||||
|
||||
There is no automatic handling of these exceptions (such as sending to a <<kafka-dlq-processing, Dead-Letter queue>>); you can consume these exceptions with your own Spring Integration flow.
|
||||
|
||||
|
||||
[[kafka-metrics]]
|
||||
== Kafka Metrics
|
||||
|
||||
Kafka binder module exposes the following metrics:
|
||||
|
||||
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag` - this metric indicates how many messages have not been yet consumed from given binder's topic by given consumer group.
|
||||
For example if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, then consumer group `myGroup` has `1000` messages to waiting to be consumed from topic `myTopic`.
|
||||
This metric is particularly useful to provide auto-scaling feedback to PaaS platform of your choice.
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.3.0.RC1</version>
|
||||
<version>1.3.2.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -19,6 +19,13 @@ package org.springframework.cloud.stream.binder.kafka;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
@@ -33,13 +40,18 @@ import org.springframework.kafka.core.ConsumerFactory;
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Marius Bogoevici
|
||||
* @author Henryk Konsek
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
|
||||
private static final int DEFAULT_TIMEOUT = 60;
|
||||
|
||||
private final KafkaMessageChannelBinder binder;
|
||||
|
||||
private final ConsumerFactory<?, ?> consumerFactory;
|
||||
|
||||
private int timeout = DEFAULT_TIMEOUT;
|
||||
|
||||
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
|
||||
ConsumerFactory<?, ?> consumerFactory) {
|
||||
this.binder = binder;
|
||||
@@ -47,28 +59,67 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the timeout in seconds to retrieve health information.
|
||||
* @param timeout the timeout - default 60.
|
||||
*/
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Health health() {
|
||||
try (Consumer<?, ?> metadataConsumer = consumerFactory.createConsumer()) {
|
||||
Set<String> downMessages = new HashSet<>();
|
||||
for (String topic : this.binder.getTopicsInUse().keySet()) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
if (this.binder.getTopicsInUse().get(topic).getPartitionInfos().contains(partitionInfo)
|
||||
&& partitionInfo.leader()
|
||||
.id() == -1) {
|
||||
downMessages.add(partitionInfo.toString());
|
||||
ExecutorService exec = Executors.newSingleThreadExecutor();
|
||||
Future<Health> future = exec.submit(new Callable<Health>() {
|
||||
|
||||
@Override
|
||||
public Health call() {
|
||||
try (Consumer<?, ?> metadataConsumer = consumerFactory.createConsumer()) {
|
||||
Set<String> downMessages = new HashSet<>();
|
||||
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
|
||||
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
|
||||
downMessages.add(partitionInfo.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (downMessages.isEmpty()) {
|
||||
return Health.up().build();
|
||||
}
|
||||
else {
|
||||
return Health.down()
|
||||
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
return Health.down(e).build();
|
||||
}
|
||||
}
|
||||
if (downMessages.isEmpty()) {
|
||||
return Health.up().build();
|
||||
}
|
||||
return Health.down().withDetail("Following partitions in use have no leaders: ", downMessages.toString())
|
||||
|
||||
});
|
||||
try {
|
||||
return future.get(this.timeout, TimeUnit.SECONDS);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return Health.down()
|
||||
.withDetail("Interrupted while waiting for partition information in", this.timeout + " seconds")
|
||||
.build();
|
||||
}
|
||||
catch (Exception e) {
|
||||
catch (ExecutionException e) {
|
||||
return Health.down(e).build();
|
||||
}
|
||||
catch (TimeoutException e) {
|
||||
return Health.down()
|
||||
.withDetail("Failed to retrieve partition information in", this.timeout + " seconds")
|
||||
.build();
|
||||
}
|
||||
finally {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -16,17 +16,22 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
@@ -36,9 +41,11 @@ import org.apache.kafka.common.utils.Utils;
|
||||
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.BinderHeaders;
|
||||
import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
import org.springframework.cloud.stream.binder.MessageValues;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
|
||||
@@ -68,6 +75,7 @@ import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
@@ -87,12 +95,19 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
* @author Soby Chacko
|
||||
* @author Henryk Konsek
|
||||
* @author Doug Saus
|
||||
* @author Aldo Sinanaj
|
||||
*/
|
||||
public class KafkaMessageChannelBinder extends
|
||||
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
|
||||
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
|
||||
|
||||
public static final String X_ORIGINAL_TOPIC = "x-original-topic";
|
||||
|
||||
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
|
||||
|
||||
public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
|
||||
|
||||
private final KafkaBinderConfigurationProperties configurationProperties;
|
||||
|
||||
private final Map<String, TopicInformation> topicsInUse = new HashMap<>();
|
||||
@@ -154,10 +169,16 @@ public class KafkaMessageChannelBinder extends
|
||||
producerProperties.getPartitionCount(),
|
||||
false,
|
||||
new Callable<Collection<PartitionInfo>>() {
|
||||
|
||||
@Override
|
||||
public Collection<PartitionInfo> call() throws Exception {
|
||||
return producerFB.createProducer().partitionsFor(destination.getName());
|
||||
Producer<byte[], byte[]> producer = producerFB.createProducer();
|
||||
List<PartitionInfo> partitionsFor = producer.partitionsFor(destination.getName());
|
||||
producer.close();
|
||||
producerFB.destroy();
|
||||
return partitionsFor;
|
||||
}
|
||||
|
||||
});
|
||||
this.topicsInUse.put(destination.getName(), new TopicInformation(null, partitions));
|
||||
if (producerProperties.getPartitionCount() < partitions.size()) {
|
||||
@@ -238,10 +259,15 @@ public class KafkaMessageChannelBinder extends
|
||||
Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
|
||||
extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
|
||||
new Callable<Collection<PartitionInfo>>() {
|
||||
|
||||
@Override
|
||||
public Collection<PartitionInfo> call() throws Exception {
|
||||
return consumerFactory.createConsumer().partitionsFor(destination.getName());
|
||||
Consumer<?, ?> consumer = consumerFactory.createConsumer();
|
||||
List<PartitionInfo> partitionsFor = consumer.partitionsFor(destination.getName());
|
||||
consumer.close();
|
||||
return partitionsFor;
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
Collection<PartitionInfo> listenedPartitions;
|
||||
@@ -329,10 +355,35 @@ public class KafkaMessageChannelBinder extends
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
final ConsumerRecord<?, ?> record = message.getHeaders()
|
||||
.get(KafkaMessageDrivenChannelAdapter.KAFKA_RAW_DATA, ConsumerRecord.class);
|
||||
final byte[] key = record.key() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
|
||||
final byte[] key = record.key() != null
|
||||
? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
|
||||
: null;
|
||||
final byte[] payload = record.value() != null
|
||||
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
|
||||
final byte[] payload;
|
||||
if (message.getPayload() instanceof Throwable) {
|
||||
final Throwable throwable = (Throwable) message.getPayload();
|
||||
final String failureMessage = throwable.getMessage();
|
||||
|
||||
try {
|
||||
MessageValues messageValues = EmbeddedHeaderUtils
|
||||
.extractHeaders(MessageBuilder.withPayload((byte[]) record.value()).build(),
|
||||
false);
|
||||
messageValues.put(X_ORIGINAL_TOPIC, record.topic());
|
||||
messageValues.put(X_EXCEPTION_MESSAGE, failureMessage);
|
||||
messageValues.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(throwable));
|
||||
|
||||
final String[] headersToEmbed = new ArrayList<>(messageValues.keySet()).toArray(
|
||||
new String[messageValues.keySet().size()]);
|
||||
payload = EmbeddedHeaderUtils.embedHeaders(messageValues,
|
||||
EmbeddedHeaderUtils.headersToEmbed(headersToEmbed));
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
else {
|
||||
payload = record.value() != null
|
||||
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
|
||||
}
|
||||
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
|
||||
? extendedConsumerProperties.getExtension().getDlqName()
|
||||
: "error." + destination.getName() + "." + group;
|
||||
@@ -420,6 +471,13 @@ public class KafkaMessageChannelBinder extends
|
||||
return original.substring(0, maxCharacters) + "...";
|
||||
}
|
||||
|
||||
private String getStackTraceAsString(Throwable cause) {
|
||||
StringWriter stringWriter = new StringWriter();
|
||||
PrintWriter printWriter = new PrintWriter(stringWriter, true);
|
||||
cause.printStackTrace(printWriter);
|
||||
return stringWriter.getBuffer().toString();
|
||||
}
|
||||
|
||||
private final class ProducerConfigurationMessageHandler extends KafkaProducerMessageHandler<byte[], byte[]>
|
||||
implements Lifecycle {
|
||||
|
||||
|
||||
@@ -68,6 +68,7 @@ import org.springframework.util.ObjectUtils;
|
||||
* @author Mark Fisher
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Henryk Konsek
|
||||
* @author Gary Russell
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(Binder.class)
|
||||
@@ -128,7 +129,10 @@ public class KafkaBinderConfiguration {
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
|
||||
}
|
||||
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
|
||||
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
|
||||
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
|
||||
consumerFactory);
|
||||
indicator.setTimeout(this.configurationProperties.getHealthTimeout());
|
||||
return indicator;
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -15,6 +15,10 @@
|
||||
*/
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@@ -27,16 +31,16 @@ import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
import org.springframework.boot.actuate.health.Status;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
|
||||
/**
|
||||
* @author Barry Commins
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public class KafkaBinderHealthIndicatorTest {
|
||||
|
||||
@@ -53,14 +57,15 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
@Mock
|
||||
private KafkaMessageChannelBinder binder;
|
||||
|
||||
private Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = new HashMap<>();
|
||||
private final Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = new HashMap<>();
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
given(consumerFactory.createConsumer()).willReturn(consumer);
|
||||
given(binder.getTopicsInUse()).willReturn(topicsInUse);
|
||||
indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
|
||||
this.indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
|
||||
this.indicator.setTimeout(10);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -70,6 +75,7 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Health health = indicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||
verify(this.consumer).close();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -81,6 +87,25 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void kafkaBinderDoesNotAnswer() {
|
||||
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
|
||||
given(consumer.partitionsFor(TEST_TOPIC)).willAnswer(new Answer<Object>() {
|
||||
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
final int fiveMinutes = 1000 * 60 * 5;
|
||||
Thread.sleep(fiveMinutes);
|
||||
return partitions;
|
||||
}
|
||||
|
||||
});
|
||||
this.indicator.setTimeout(1);
|
||||
Health health = indicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
}
|
||||
|
||||
private List<PartitionInfo> partitions(Node leader) {
|
||||
List<PartitionInfo> partitions = new ArrayList<>();
|
||||
partitions.add(new PartitionInfo(TEST_TOPIC, 0, leader, null, null));
|
||||
|
||||
@@ -27,6 +27,9 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
@@ -90,14 +93,12 @@ import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Henryk Konsek
|
||||
* @author Gary Russell
|
||||
* @author Aldo Sinanaj
|
||||
*/
|
||||
public abstract class KafkaBinderTests extends
|
||||
PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
|
||||
@@ -206,12 +207,21 @@ public abstract class KafkaBinderTests extends
|
||||
"error.dlqTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties);
|
||||
binderBindUnbindLatency();
|
||||
String testMessagePayload = "test." + UUID.randomUUID().toString();
|
||||
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
|
||||
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload)
|
||||
.setHeader("dlqTestHeader", "propagatedToDlq")
|
||||
.build();
|
||||
moduleOutputChannel.send(testMessage);
|
||||
|
||||
Message<?> receivedMessage = receive(dlqChannel, 3);
|
||||
assertThat(receivedMessage).isNotNull();
|
||||
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload);
|
||||
final MessageHeaders headers = receivedMessage.getHeaders();
|
||||
assertThat(headers.get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC)).isEqualTo(producerName);
|
||||
assertThat(headers.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))
|
||||
.isEqualTo("failed to send Message to channel 'null'; nested exception is java.lang.RuntimeException: fail");
|
||||
assertThat(headers.get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE)).isNotNull();
|
||||
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isNotNull();
|
||||
assertThat(headers.get("dlqTestHeader")).isEqualTo("propagatedToDlq");
|
||||
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
|
||||
binderBindUnbindLatency();
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>1.3.0.RC1</version>
|
||||
<version>1.3.2.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -39,11 +39,9 @@ import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor;
|
||||
import org.springframework.cloud.stream.binder.kstream.config.KStreamApplicationSupportProperties;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
@@ -115,7 +113,6 @@ public class KStreamBinderWordCountIntegrationTests {
|
||||
|
||||
@EnableBinding(KStreamProcessor.class)
|
||||
@EnableAutoConfiguration
|
||||
@EnableConfigurationProperties(KStreamApplicationSupportProperties.class)
|
||||
public static class WordCountProcessorApplication {
|
||||
|
||||
@Autowired
|
||||
|
||||
Reference in New Issue
Block a user