Compare commits

..

16 Commits

Author SHA1 Message Date
Soby Chacko
8982f896fd Release 1.2.0.RELEASE 2017-04-04 15:28:56 -04:00
Marius Bogoevici
005ec51d8b Improve isolation of Kafka tests
Fix #120
2017-04-04 12:10:36 -04:00
Marius Bogoevici
47aaf29e3e Fix regression on TopicExistsException
Fix #116
2017-03-31 13:30:13 -04:00
Marius Bogoevici
88d4b8eef5 Replace core docs Git reference with relative link
Fix #114
2017-03-29 13:47:23 -04:00
Ilayaperumal Gopinathan
66eb15a8e2 Make Kafka DLQ topic name configurable
- Make it configurable as a Kafka consumer properties
 - Add test to verify the configuration
 - Update doc

Resolves #108

Test fix to use the same partition count for producer/dlq

Fix KafkaTopicProvisioner in case of configurable dlq topic name

 - Update test
2017-03-24 16:59:13 -04:00
Soby Chacko
466400cdb7 Adding test for raw mode with String payload 2017-03-14 18:37:55 -04:00
Marius Bogoevici
bff0a072dc Set version to 1.2.0.BUILD-SNAPSHOT 2017-03-13 19:52:12 -04:00
Marius Bogoevici
7fad2951f7 Release 1.2.0.RC1 2017-03-13 19:51:32 -04:00
Marius Bogoevici
e7c5f750da Use Spring Cloud Build 1.3.1.RELEASE 2017-03-13 19:43:28 -04:00
Gary Russell
1f28adaf4c GH-21: Docs For Replaying Dead-Lettered Messages
Resolves #21

Use BinderHeaders.PARTITION_OVERRIDE
2017-03-13 17:04:48 -04:00
Barry Commins
91bdee65ec Configure the health indicator ConsumerFactory to use binder properties
Fixes #79

Replaced try...finally in KafkaBinderHealthIndicator with try-with-resources

Changed KafkaBinderHealthIndicatorTest for consistency
2017-03-06 12:57:05 -05:00
Ilayaperumal Gopinathan
6a31e9c94f Support KafkaProperties from Spring Boot autoconfiguration
- If KafkaProperties is available from KafkaAutoConfiguration, retrieve the kafka properties and set as `KafkaBinderConfigurationProperties`' configuration property.
This way, these properties are available at the top level and per-binding properties could still override when setting producer/consumer properties during binding operation
 - Add tests to verify the scenarios

Resolves #73

Fix deprecation warning

Remove unused constant

Polishing
2017-03-06 11:32:10 -05:00
Marius Bogoevici
b541fca68f Fix 'spring-cloud-stream-binder-kafka-test-support'
Set scope compile on 'spring-kafka-test' so that the
dependency is transitive.
2017-02-21 19:11:31 -05:00
Marius Bogoevici
9d23e6a8fe Move JLine exclusion to the parent 2017-02-21 17:05:29 -05:00
Marius Bogoevici
89249b233f Set dependencies to snapshots 2017-02-21 16:15:40 -05:00
Marius Bogoevici
1998d5e7e8 Set version to 1.2.0.BUILD-SNAPSHOT 2017-02-21 16:14:42 -05:00
27 changed files with 767 additions and 131 deletions

13
pom.xml
View File

@@ -2,12 +2,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.M2</version>
<version>1.2.0.RELEASE</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.3.1.M1</version>
<version>1.3.1.RELEASE</version>
<relativePath />
</parent>
<properties>
@@ -15,8 +15,7 @@
<kafka.version>0.10.1.1</kafka.version>
<spring-kafka.version>1.1.2.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.1.0.RELEASE</spring-integration-kafka.version>
<spring-cloud-stream.version>1.2.0.M2</spring-cloud-stream.version>
<spring-boot.version>1.5.1.RELEASE</spring-boot.version>
<spring-cloud-stream.version>1.2.0.RELEASE</spring-cloud-stream.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>
@@ -45,6 +44,10 @@
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
@@ -135,7 +138,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build-tools</artifactId>
<version>1.3.1.M1</version>
<version>1.3.1.RELEASE</version>
</dependency>
</dependencies>
<executions>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.M2</version>
<version>1.2.0.RELEASE</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.M2</version>
<version>1.2.0.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.0-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.0 Tests</description>
@@ -28,7 +28,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>1.2.0.M2</version>
<version>1.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.M2</version>
<version>1.2.0.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.9</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.9 Tests</description>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.M2</version>
<version>1.2.0.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -33,7 +33,7 @@ public class KafkaBinderConfigurationProperties {
private String[] zkNodes = new String[] {"localhost"};
private Map<String, String> configuration = new HashMap<>();
private Map<String, Object> configuration = new HashMap<>();
private String defaultZkPort = "2181";
@@ -249,11 +249,11 @@ public class KafkaBinderConfigurationProperties {
this.socketBufferSize = socketBufferSize;
}
public Map<String, String> getConfiguration() {
public Map<String, Object> getConfiguration() {
return configuration;
}
public void setConfiguration(Map<String, String> configuration) {
public void setConfiguration(Map<String, Object> configuration) {
this.configuration = configuration;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import java.util.Map;
/**
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
*
* <p>Thanks to Laszlo Szabo for providing the initial patch for generic property support.</p>
*/
@@ -38,6 +39,8 @@ public class KafkaConsumerProperties {
private boolean enableDlq;
private String dlqName;
private int recoveryInterval = 5000;
private Map<String, String> configuration = new HashMap<>();
@@ -119,4 +122,12 @@ public class KafkaConsumerProperties {
public void setConfiguration(Map<String, String> configuration) {
this.configuration = configuration;
}
public String getDlqName() {
return dlqName;
}
public void setDlqName(String dlqName) {
this.dlqName = dlqName;
}
}

View File

@@ -55,6 +55,7 @@ import kafka.utils.ZkUtils;
*
* @author Soby Chacko
* @author Gary Russell
* @author Ilayaperumal Gopinathan
*/
public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>>, InitializingBean {
@@ -137,7 +138,8 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
JaasUtils.isZkSecurityEnabled());
int partitions = adminUtilsOperation.partitionSize(name, zkUtils);
if (properties.getExtension().isEnableDlq() && !anonymous) {
String dlqTopic = "error." + name + "." + group;
String dlqTopic = StringUtils.hasText(properties.getExtension().getDlqName()) ?
properties.getExtension().getDlqName() : "error." + name + "." + group;
createTopicAndPartitions(dlqTopic, partitions);
return new KafkaConsumerDestination(name, partitions, dlqTopic);
}
@@ -199,8 +201,22 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
@Override
public Object doWithRetry(RetryContext context) throws RuntimeException {
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
configurationProperties.getReplicationFactor(), new Properties());
try {
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
configurationProperties.getReplicationFactor(), new Properties());
}
catch (Exception e) {
String exceptionClass = e.getClass().getName();
if (exceptionClass.equals("kafka.common.TopicExistsException")
|| exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")) {
if (logger.isWarnEnabled()) {
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
}
}
else {
throw e;
}
}
return null;
}
});

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.M2</version>
<version>1.2.0.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>

View File

@@ -0,0 +1,109 @@
[[kafka-dlq-processing]]
== Dead-Letter Topic Processing
Because it can't be anticipated how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them.
If the reason for the dead-lettering is transient, you may wish to route the messages back to the original topic.
However, if the problem is a permanent issue, that could cause an infinite loop.
The following `spring-boot` application is an example of how to route those messages back to the original topic, but moves them to a third "parking lot" topic after three attempts.
The application is simply another spring-cloud-stream application that reads from the dead-letter topic.
It terminates when no messages are received for 5 seconds.
The examples assume the original destination is `so8400out` and the consumer group is `so8400`.
There are several considerations.
- Consider only running the rerouting when the main application is not running.
Otherwise, the retries for transient errors will be used up very quickly.
- Alternatively, use a two-stage approach - use this application to route to a third topic, and another to route from there back to the main topic.
- Since this technique uses a message header to keep track of retries, it won't work with `headerMode=raw`.
In that case, consider adding some data to the payload (that can be ignored by the main application).
- `x-retries` has to be added to the `headers` property `spring.cloud.stream.kafka.binder.headers=x-retries` on both this, and the main application so that the header is transported between the applications.
- Since kafka is publish/subscribe, replayed messages will be sent to each consumer group, even those that successfully processed a message the first time around.
.application.properties
[source]
----
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
----
.Application
[source, java]
----
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private MessageChannel parkingLot;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> reRoute(Message<?> failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries.intValue() < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, terminating");
return;
}
}
}
public interface TwoOutputProcessor extends Processor {
@Output("parkingLot")
MessageChannel parkingLot();
}
}
----

View File

@@ -23,7 +23,7 @@ Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinat
= Reference Guide
include::overview.adoc[]
include::dlq.adoc[]
= Appendices
[appendix]

View File

@@ -41,7 +41,7 @@ Partitioning also maps directly to Apache Kafka partitions as well.
This section contains the configuration options used by the Apache Kafka binder.
For common configuration options and properties pertaining to binder, refer to the https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc#configuration-options[core docs].
For common configuration options and properties pertaining to binder, refer to the <<binding-properties,core documentation>>.
=== Kafka Binder Properties
@@ -158,7 +158,8 @@ If the consumer group is set explicitly for the consumer 'binding' (via `spring.
Default: null (equivalent to `earliest`).
enableDlq::
When set to true, it will send enable DLQ behavior for the consumer.
Messages that result in errors will be forwarded to a topic named `error.<destination>.<group>`.
By default, messages that result in errors will be forwarded to a topic named `error.<destination>.<group>`.
The DLQ topic name can be configurable via the property `dlqName`.
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
+
Default: `false`.
@@ -166,6 +167,10 @@ configuration::
Map with a key/value pair containing generic Kafka consumer properties.
+
Default: Empty map.
dlqName::
The name of the DLQ topic to receive the error messages.
+
Default: null (If not specified, messages that result in errors will be forwarded to a topic named `error.<destination>.<group>`).
=== Kafka Producer Properties

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.M2</version>
<version>1.2.0.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<description>Kafka related test classes</description>
@@ -19,6 +19,7 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@@ -10,14 +10,14 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.M2</version>
<version>1.2.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>1.2.0.M2</version>
<version>1.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -45,12 +45,6 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<exclusions>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>

View File

@@ -19,6 +19,9 @@ package org.springframework.cloud.stream.binder.kafka;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.core.env.ConfigurableEnvironment;
@@ -32,13 +35,35 @@ import org.springframework.core.env.MapPropertySource;
*/
public class KafkaBinderEnvironmentPostProcessor implements EnvironmentPostProcessor {
public final static String SPRING_KAFKA = "spring.kafka";
public final static String SPRING_KAFKA_PRODUCER = SPRING_KAFKA + ".producer";
public final static String SPRING_KAFKA_CONSUMER = SPRING_KAFKA + ".consumer";
public final static String SPRING_KAFKA_PRODUCER_KEY_SERIALIZER = SPRING_KAFKA_PRODUCER + "." + "keySerializer";
public final static String SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER = SPRING_KAFKA_PRODUCER + "." + "valueSerializer";
public final static String SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER = SPRING_KAFKA_CONSUMER + "." + "keyDeserializer";
public final static String SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER = SPRING_KAFKA_CONSUMER + "." + "valueDeserializer";
private static final String KAFKA_BINDER_DEFAULT_PROPERTIES = "kafkaBinderDefaultProperties";
@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
Map<String, Object> propertiesToAdd = new HashMap<>();
propertiesToAdd.put("logging.pattern.console", "%d{ISO8601} %5p %t %c{2}:%L - %m%n");
propertiesToAdd.put("logging.level.org.I0Itec.zkclient", "ERROR");
propertiesToAdd.put("logging.level.kafka.server.KafkaConfig", "ERROR");
propertiesToAdd.put("logging.level.kafka.admin.AdminClient.AdminConfig", "ERROR");
environment.getPropertySources().addLast(new MapPropertySource("kafkaBinderLogConfig", propertiesToAdd));
if (!environment.getPropertySources().contains(KAFKA_BINDER_DEFAULT_PROPERTIES)) {
Map<String, Object> kafkaBinderDefaultProperties = new HashMap<>();
kafkaBinderDefaultProperties.put("logging.pattern.console", "%d{ISO8601} %5p %t %c{2}:%L - %m%n");
kafkaBinderDefaultProperties.put("logging.level.org.I0Itec.zkclient", "ERROR");
kafkaBinderDefaultProperties.put("logging.level.kafka.server.KafkaConfig", "ERROR");
kafkaBinderDefaultProperties.put("logging.level.kafka.admin.AdminClient.AdminConfig", "ERROR");
kafkaBinderDefaultProperties.put(SPRING_KAFKA_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName());
kafkaBinderDefaultProperties.put(SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER, ByteArraySerializer.class.getName());
kafkaBinderDefaultProperties.put(SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER, ByteArrayDeserializer.class.getName());
kafkaBinderDefaultProperties.put(SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName());
environment.getPropertySources().addLast(new MapPropertySource(KAFKA_BINDER_DEFAULT_PROPERTIES, kafkaBinderDefaultProperties));
}
}
}

View File

@@ -16,20 +16,16 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.ConsumerFactory;
/**
* Health indicator for Kafka.
@@ -41,24 +37,18 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
private final KafkaMessageChannelBinder binder;
private final KafkaBinderConfigurationProperties configurationProperties;
private final ConsumerFactory<?, ?> consumerFactory;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties configurationProperties) {
ConsumerFactory<?, ?> consumerFactory) {
this.binder = binder;
this.configurationProperties = configurationProperties;
this.consumerFactory = consumerFactory;
}
@Override
public Health health() {
Map<String, String> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties
.getKafkaConnectionString());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
KafkaConsumer metadataConsumer = new KafkaConsumer(properties);
try {
try (Consumer<?, ?> metadataConsumer = consumerFactory.createConsumer()) {
Set<String> downMessages = new HashSet<>();
for (String topic : this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
@@ -78,8 +68,5 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
catch (Exception e) {
return Health.down(e).build();
}
finally {
metadataConsumer.close();
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 the original author or authors.
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -98,7 +98,7 @@ public class KafkaMessageChannelBinder extends
private final Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<>();
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
KafkaTopicProvisioner provisioningProvider) {
super(false, headersToMap(configurationProperties), provisioningProvider);
this.configurationProperties = configurationProperties;
}
@@ -143,7 +143,7 @@ public class KafkaMessageChannelBinder extends
@Override
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
final DefaultKafkaProducerFactory<byte[], byte[]> producerFB = getProducerFactory(producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(producerProperties.getPartitionCount(),
new Callable<Collection<PartitionInfo>>() {
@@ -171,20 +171,27 @@ public class KafkaMessageChannelBinder extends
private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
Map<String, Object> props = new HashMap<>();
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(this.configurationProperties.getRequiredAcks()));
props.put(ProducerConfig.LINGER_MS_CONFIG,
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
producerProperties.getExtension().getCompressionType().toString());
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
if (ObjectUtils.isEmpty(props.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
}
if (ObjectUtils.isEmpty(props.get(ProducerConfig.BATCH_SIZE_CONFIG))) {
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
}
if (ObjectUtils.isEmpty(props.get(ProducerConfig.LINGER_MS_CONFIG))) {
props.put(ProducerConfig.LINGER_MS_CONFIG, String.valueOf(producerProperties.getExtension().getBatchTimeout()));
}
if (ObjectUtils.isEmpty(props.get(ProducerConfig.COMPRESSION_TYPE_CONFIG))) {
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
producerProperties.getExtension().getCompressionType().toString());
}
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
}
@@ -194,18 +201,14 @@ public class KafkaMessageChannelBinder extends
@Override
@SuppressWarnings("unchecked")
protected MessageProducer createConsumerEndpoint(final ConsumerDestination destination, final String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
Assert.isTrue(!anonymous || !extendedConsumerProperties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
Map<String, Object> props = getConsumerConfig(anonymous, consumerGroup);
if (!ObjectUtils.isEmpty(properties.getExtension().getConfiguration())) {
props.putAll(properties.getExtension().getConfiguration());
}
final ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup, extendedConsumerProperties);
int partitionCount = extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency();
Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
new Callable<Collection<PartitionInfo>>() {
@@ -217,15 +220,15 @@ public class KafkaMessageChannelBinder extends
Collection<PartitionInfo> listenedPartitions;
if (properties.getExtension().isAutoRebalanceEnabled() ||
properties.getInstanceCount() == 1) {
if (extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ||
extendedConsumerProperties.getInstanceCount() == 1) {
listenedPartitions = allPartitions;
}
else {
listenedPartitions = new ArrayList<>();
for (PartitionInfo partition : allPartitions) {
// divide partitions across modules
if ((partition.partition() % properties.getInstanceCount()) == properties.getInstanceIndex()) {
if ((partition.partition() % extendedConsumerProperties.getInstanceCount()) == extendedConsumerProperties.getInstanceIndex()) {
listenedPartitions.add(partition);
}
}
@@ -236,9 +239,9 @@ public class KafkaMessageChannelBinder extends
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
listenedPartitions);
final ContainerProperties containerProperties =
anonymous || properties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(destination.getName())
anonymous || extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(destination.getName())
: new ContainerProperties(topicPartitionInitialOffsets);
int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
new ConcurrentMessageListenerContainer(
consumerFactory, containerProperties) {
@@ -249,8 +252,8 @@ public class KafkaMessageChannelBinder extends
}
};
messageListenerContainer.setConcurrency(concurrency);
messageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(properties));
if (!properties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
}
if (this.logger.isDebugEnabled()) {
@@ -265,9 +268,9 @@ public class KafkaMessageChannelBinder extends
new KafkaMessageDrivenChannelAdapter<>(
messageListenerContainer);
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
final RetryTemplate retryTemplate = buildRetryTemplate(extendedConsumerProperties);
kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);
if (properties.getExtension().isEnableDlq()) {
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(new ExtendedProducerProperties<>(new KafkaProducerProperties()));
final KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
messageListenerContainer.getContainerProperties().setErrorHandler(new ErrorHandler() {
@@ -278,8 +281,9 @@ public class KafkaMessageChannelBinder extends
: null;
final byte[] payload = message.value() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) message.value())) : null;
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send("error." + destination.getName() + "." + group,
message.partition(), key, payload);
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName()) ?
extendedConsumerProperties.getExtension().getDlqName() : "error." + destination.getName() + "." + group;
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(dlqName, message.partition(), key, payload);
sentDlq.addCallback(new ListenableFutureCallback<SendResult<byte[], byte[]>>() {
StringBuilder sb = new StringBuilder().append(" a message with key='")
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'")
@@ -308,20 +312,25 @@ public class KafkaMessageChannelBinder extends
return kafkaMessageDrivenChannelAdapter;
}
private Map<String, Object> getConsumerConfig(boolean anonymous, String consumerGroup) {
private ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
anonymous ? "latest" : "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
return props;
if (ObjectUtils.isEmpty(props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
}
if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getConfiguration())) {
props.putAll(consumerProperties.getExtension().getConfiguration());
}
return new DefaultKafkaConsumerFactory<>(props);
}
private boolean isAutoCommitOnError(ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
@@ -358,8 +367,8 @@ public class KafkaMessageChannelBinder extends
private final DefaultKafkaProducerFactory<byte[], byte[]> producerFactory;
private ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
super(kafkaTemplate);
setTopicExpression(new LiteralExpression(topic));
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());

View File

@@ -16,16 +16,24 @@
package org.springframework.cloud.stream.binder.kafka.config;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
@@ -49,8 +57,11 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.integration.codec.Codec;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.util.ObjectUtils;
/**
* @author David Turanski
@@ -61,7 +72,7 @@ import org.springframework.kafka.support.ProducerListener;
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class})
@Import({KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaBinderConfiguration.KafkaPropertiesConfiguration.class})
@EnableConfigurationProperties({KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class})
public class KafkaBinderConfiguration {
@@ -108,7 +119,15 @@ public class KafkaBinderConfiguration {
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, this.configurationProperties);
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
}
@Bean(name = "adminUtilsOperation")
@@ -154,4 +173,47 @@ public class KafkaBinderConfiguration {
private JaasLoginModuleConfiguration zookeeper;
}
@ConditionalOnClass(name = "org.springframework.boot.autoconfigure.kafka.KafkaProperties")
public static class KafkaPropertiesConfiguration {
// KafkaProperties can still be unavailable if KafkaAutoConfiguration is disabled.
@Autowired(required = false)
private KafkaProperties kafkaProperties;
@Autowired
private KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties;
@PostConstruct
public void init() {
Map<String, Object> configuration = this.kafkaBinderConfigurationProperties.getConfiguration();
if (this.kafkaProperties != null) {
for (Map.Entry<String, String> properties : this.kafkaProperties.getProperties().entrySet()) {
if (!configuration.containsKey(properties.getKey())) {
configuration.put(properties.getKey(), properties.getValue());
}
}
for (Map.Entry<String, Object> producerProperties : this.kafkaProperties.buildProducerProperties().entrySet()) {
if (!configuration.containsKey(producerProperties.getKey())) {
configuration.put(producerProperties.getKey(), producerProperties.getValue());
}
}
for (Map.Entry<String, Object> consumerProperties : this.kafkaProperties.buildConsumerProperties().entrySet()) {
if (!configuration.containsKey(consumerProperties.getKey())) {
configuration.put(consumerProperties.getKey(), consumerProperties.getValue());
}
}
if (ObjectUtils.isEmpty(configuration.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))) {
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBinderConfigurationProperties.getKafkaConnectionString());
}
else {
@SuppressWarnings("unchecked")
List<String> bootStrapServers = (List<String>) configuration.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
if (bootStrapServers.size() == 1 && bootStrapServers.get(0).equals("localhost:9092")) {
configuration.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBinderConfigurationProperties.getKafkaConnectionString());
}
}
}
}
}
}

View File

@@ -0,0 +1,95 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ReflectionUtils;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* @author Ilayaperumal Gopinathan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {KafkaBinderAutoConfigurationPropertiesTest.KafkaBinderConfigProperties.class, KafkaBinderConfiguration.class})
@TestPropertySource(locations = "classpath:binder-config-autoconfig.properties")
public class KafkaBinderAutoConfigurationPropertiesTest {
@Autowired
private KafkaMessageChannelBinder kafkaMessageChannelBinder;
@Test
public void testKafkaBinderConfigurationWithKafkaProperties() throws Exception {
assertNotNull(this.kafkaMessageChannelBinder);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(new KafkaProducerProperties());
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory", ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod.invoke(this.kafkaMessageChannelBinder, producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField, producerFactory);
assertTrue(producerConfigs.get("batch.size").equals(10));
assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class));
assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class));
assertTrue(producerConfigs.get("compression.type").equals("snappy"));
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9092");
bootstrapServers.add("10.98.09.196:9092");
assertTrue((((List<String>) producerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
createKafkaConsumerFactoryMethod.setAccessible(true);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField, consumerFactory);
assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class));
assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class));
assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
}
public static class KafkaBinderConfigProperties {
@Bean
KafkaProperties kafkaProperties() {
return new KafkaProperties();
}
}
}

View File

@@ -0,0 +1,89 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ReflectionUtils;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* @author Ilayaperumal Gopinathan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {KafkaBinderConfiguration.class})
@TestPropertySource(locations = "classpath:binder-config.properties")
public class KafkaBinderConfigurationPropertiesTest {
@Autowired
private KafkaMessageChannelBinder kafkaMessageChannelBinder;
@Test
public void testKafkaBinderConfigurationProperties() throws Exception {
assertNotNull(this.kafkaMessageChannelBinder);
KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties();
kafkaProducerProperties.setBufferSize(12345);
kafkaProducerProperties.setBatchTimeout(100);
kafkaProducerProperties.setCompressionType(KafkaProducerProperties.CompressionType.gzip);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(kafkaProducerProperties);
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory", ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod.invoke(this.kafkaMessageChannelBinder, producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField, producerFactory);
assertTrue(producerConfigs.get("batch.size").equals("12345"));
assertTrue(producerConfigs.get("linger.ms").equals("100"));
assertTrue(producerConfigs.get("key.serializer").equals(ByteArraySerializer.class));
assertTrue(producerConfigs.get("value.serializer").equals(ByteArraySerializer.class));
assertTrue(producerConfigs.get("compression.type").equals("gzip"));
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9082");
assertTrue((((String) producerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
createKafkaConsumerFactoryMethod.setAccessible(true);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod.invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs", Map.class);
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField, consumerFactory);
assertTrue(consumerConfigs.get("key.deserializer").equals(ByteArrayDeserializer.class));
assertTrue(consumerConfigs.get("value.deserializer").equals(ByteArrayDeserializer.class));
assertTrue((((String) consumerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,8 +15,6 @@
*/
package org.springframework.cloud.stream.binder.kafka;
import static org.junit.Assert.assertNotNull;
import java.lang.reflect.Field;
import org.junit.Test;
@@ -29,6 +27,8 @@ import org.springframework.kafka.support.ProducerListener;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ReflectionUtils;
import static org.junit.Assert.assertNotNull;
/**
* @author Ilayaperumal Gopinathan
*/

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
/**
* @author Barry Commins
*/
public class KafkaBinderHealthIndicatorTest {
private static final String TEST_TOPIC = "test";
private KafkaBinderHealthIndicator indicator;
@Mock
private DefaultKafkaConsumerFactory consumerFactory;
@Mock
private KafkaConsumer consumer;
@Mock
private KafkaMessageChannelBinder binder;
private Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<>();
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
given(consumerFactory.createConsumer()).willReturn(consumer);
given(binder.getTopicsInUse()).willReturn(topicsInUse);
indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
}
@Test
public void kafkaBinderIsUp() {
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, partitions);
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
}
@Test
public void kafkaBinderIsDown() {
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
topicsInUse.put(TEST_TOPIC, partitions);
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
private List<PartitionInfo> partitions(Node leader) {
List<PartitionInfo> partitions = new ArrayList<>();
partitions.add(new PartitionInfo(TEST_TOPIC, 0, leader, null, null));
return partitions;
}
}

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka;
import javax.security.auth.login.AppConfigurationEntry;
import com.sun.security.auth.login.ConfigFile;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -274,6 +274,68 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testConfigurableDlqName() throws Exception {
Binder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
DirectChannel moduleInputChannel = new DirectChannel();
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
moduleInputChannel.subscribe(handler);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(10);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(3);
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
consumerProperties.getExtension().setEnableDlq(true);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
String dlqName = "dlqTest";
consumerProperties.getExtension().setDlqName(dlqName);
long uniqueBindingId = System.currentTimeMillis();
Binding<MessageChannel> producerBinding = binder.bindProducer("retryTest." + uniqueBindingId + ".0",
moduleOutputChannel, producerProperties);
Binding<MessageChannel> consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
"testGroup", moduleInputChannel, consumerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
dlqConsumerProperties.setMaxAttempts(1);
QueueChannel dlqChannel = new QueueChannel();
Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(dlqName, null, dlqChannel, dlqConsumerProperties);
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
moduleOutputChannel.send(testMessage);
Message<?> dlqMessage = receive(dlqChannel, 3);
assertThat(dlqMessage).isNotNull();
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload);
// first attempt fails
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator().next().getValue();
assertThat(handledMessage).isNotNull();
assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload);
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
binderBindUnbindLatency();
dlqConsumerBinding.unbind();
consumerBinding.unbind();
// on the second attempt the message is not redelivered because the DLQ is set
QueueChannel successfulInputChannel = new QueueChannel();
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0", "testGroup",
successfulInputChannel, consumerProperties);
String testMessage2Payload = "test." + UUID.randomUUID().toString();
Message<String> testMessage2 = MessageBuilder.withPayload(testMessage2Payload).build();
moduleOutputChannel.send(testMessage2);
Message<?> receivedMessage = receive(successfulInputChannel);
assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload);
binderBindUnbindLatency();
consumerBinding.unbind();
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testAutoCreateTopicsEnabledSucceeds() throws Exception {
@@ -312,10 +374,10 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.getExtension().setCompressionType(
KafkaProducerProperties.CompressionType.valueOf(codec.toString()));
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.0", moduleOutputChannel,
Binding<MessageChannel> producerBinding = binder.bindProducer("testCompression", moduleOutputChannel,
producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.0", "test", moduleInputChannel,
Binding<MessageChannel> consumerBinding = binder.bindConsumer("testCompression", "test", moduleInputChannel,
consumerProperties);
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload(testPayload)
.build();
@@ -670,13 +732,13 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
createProducerBindingProperties(createProducerProperties()));
QueueChannel moduleInputChannel = new QueueChannel();
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.x", moduleOutputChannel,
Binding<MessageChannel> producerBinding = binder.bindProducer("testManualAckSucceedsWhenAutoCommitOffsetIsTurnedOff", moduleOutputChannel,
createProducerProperties());
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setAutoCommitOffset(false);
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.x", "test", moduleInputChannel,
Binding<MessageChannel> consumerBinding = binder.bindConsumer("testManualAckSucceedsWhenAutoCommitOffsetIsTurnedOff", "test", moduleInputChannel,
consumerProperties);
String testPayload1 = "foo" + UUID.randomUUID().toString();
@@ -712,12 +774,12 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
createProducerBindingProperties(createProducerProperties()));
QueueChannel moduleInputChannel = new QueueChannel();
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.x", moduleOutputChannel,
Binding<MessageChannel> producerBinding = binder.bindProducer("testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder", moduleOutputChannel,
createProducerProperties());
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.x", "test", moduleInputChannel,
Binding<MessageChannel> consumerBinding = binder.bindConsumer("testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder", "test", moduleInputChannel,
consumerProperties);
String testPayload1 = "foo" + UUID.randomUUID().toString();
@@ -1224,7 +1286,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
Binding<?> binding = null;
try {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
Map<String, String> propertiesToOverride = configurationProperties.getConfiguration();
Map<String, Object> propertiesToOverride = configurationProperties.getConfiguration();
propertiesToOverride.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
propertiesToOverride.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
configurationProperties.setConfiguration(propertiesToOverride);
@@ -1358,7 +1420,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(properties));
output.setBeanName("test.output");
Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output, properties);
Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.raw.0", output, properties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setConcurrency(2);
@@ -1369,15 +1431,15 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
QueueChannel input0 = new QueueChannel();
input0.setBeanName("test.input0J");
Binding<MessageChannel> input0Binding = binder.bindConsumer("partJ.0", "test", input0, consumerProperties);
Binding<MessageChannel> input0Binding = binder.bindConsumer("partJ.raw.0", "test", input0, consumerProperties);
consumerProperties.setInstanceIndex(1);
QueueChannel input1 = new QueueChannel();
input1.setBeanName("test.input1J");
Binding<MessageChannel> input1Binding = binder.bindConsumer("partJ.0", "test", input1, consumerProperties);
Binding<MessageChannel> input1Binding = binder.bindConsumer("partJ.raw.0", "test", input1, consumerProperties);
consumerProperties.setInstanceIndex(2);
QueueChannel input2 = new QueueChannel();
input2.setBeanName("test.input2J");
Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.0", "test", input2, consumerProperties);
Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.raw.0", "test", input2, consumerProperties);
output.send(new GenericMessage<>(new byte[] {(byte) 0}));
output.send(new GenericMessage<>(new byte[] {(byte) 1}));
@@ -1411,11 +1473,11 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(properties));
output.setBeanName("test.output");
Binding<MessageChannel> outputBinding = binder.bindProducer("part.0", output, properties);
Binding<MessageChannel> outputBinding = binder.bindProducer("part.raw.0", output, properties);
try {
Object endpoint = extractEndpoint(outputBinding);
assertThat(getEndpointRouting(endpoint))
.contains(getExpectedRoutingBaseDestination("part.0", "test") + "-' + headers['partition']");
.contains(getExpectedRoutingBaseDestination("part.raw.0", "test") + "-' + headers['partition']");
}
catch (UnsupportedOperationException ignored) {
}
@@ -1430,15 +1492,15 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
QueueChannel input0 = new QueueChannel();
input0.setBeanName("test.input0S");
Binding<MessageChannel> input0Binding = binder.bindConsumer("part.0", "test", input0, consumerProperties);
Binding<MessageChannel> input0Binding = binder.bindConsumer("part.raw.0", "test", input0, consumerProperties);
consumerProperties.setInstanceIndex(1);
QueueChannel input1 = new QueueChannel();
input1.setBeanName("test.input1S");
Binding<MessageChannel> input1Binding = binder.bindConsumer("part.0", "test", input1, consumerProperties);
Binding<MessageChannel> input1Binding = binder.bindConsumer("part.raw.0", "test", input1, consumerProperties);
consumerProperties.setInstanceIndex(2);
QueueChannel input2 = new QueueChannel();
input2.setBeanName("test.input2S");
Binding<MessageChannel> input2Binding = binder.bindConsumer("part.0", "test", input2, consumerProperties);
Binding<MessageChannel> input2Binding = binder.bindConsumer("part.raw.0", "test", input2, consumerProperties);
Message<byte[]> message2 = org.springframework.integration.support.MessageBuilder.withPayload(new byte[] {2})
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "kafkaBinderTestCommonsDelegate")
@@ -1469,19 +1531,44 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setHeaderMode(HeaderMode.raw);
Binding<MessageChannel> producerBinding = binder.bindProducer("0", moduleOutputChannel,
Binding<MessageChannel> producerBinding = binder.bindProducer("raw.0", moduleOutputChannel,
producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setHeaderMode(HeaderMode.raw);
Binding<MessageChannel> consumerBinding = binder.bindConsumer("0", "test", moduleInputChannel,
Binding<MessageChannel> consumerBinding = binder.bindConsumer("raw.0", "test", moduleInputChannel,
consumerProperties);
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("kafkaBinderTestCommonsDelegate".getBytes()).build();
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("testSendAndReceiveWithRawMode".getBytes()).build();
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
assertThat(inbound).isNotNull();
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("kafkaBinderTestCommonsDelegate");
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("testSendAndReceiveWithRawMode");
producerBinding.unbind();
consumerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testSendAndReceiveWithRawModeAndStringPayload() throws Exception {
Binder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setHeaderMode(HeaderMode.raw);
Binding<MessageChannel> producerBinding = binder.bindProducer("raw.string.0", moduleOutputChannel,
producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setHeaderMode(HeaderMode.raw);
Binding<MessageChannel> consumerBinding = binder.bindConsumer("raw.string.0", "test", moduleInputChannel,
consumerProperties);
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("testSendAndReceiveWithRawModeAndStringPayload").build();
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
assertThat(inbound).isNotNull();
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("testSendAndReceiveWithRawModeAndStringPayload");
producerBinding.unbind();
consumerBinding.unbind();
}
@@ -1497,30 +1584,30 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
QueueChannel module3InputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setHeaderMode(HeaderMode.raw);
Binding<MessageChannel> producerBinding = binder.bindProducer("baz.0", moduleOutputChannel,
Binding<MessageChannel> producerBinding = binder.bindProducer("baz.raw.0", moduleOutputChannel,
producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setHeaderMode(HeaderMode.raw);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
Binding<MessageChannel> input1Binding = binder.bindConsumer("baz.0", "test", module1InputChannel,
Binding<MessageChannel> input1Binding = binder.bindConsumer("baz.raw.0", "test", module1InputChannel,
consumerProperties);
// A new module is using the tap as an input channel
String fooTapName = "baz.0";
String fooTapName = "baz.raw.0";
Binding<MessageChannel> input2Binding = binder.bindConsumer(fooTapName, "tap1", module2InputChannel,
consumerProperties);
// Another new module is using tap as an input channel
String barTapName = "baz.0";
String barTapName = "baz.raw.0";
Binding<MessageChannel> input3Binding = binder.bindConsumer(barTapName, "tap2", module3InputChannel,
consumerProperties);
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("kafkaBinderTestCommonsDelegate".getBytes()).build();
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("testSendAndReceiveWithExplicitConsumerGroupWithRawMode".getBytes()).build();
boolean success = false;
boolean retried = false;
while (!success) {
moduleOutputChannel.send(message);
Message<?> inbound = receive(module1InputChannel);
assertThat(inbound).isNotNull();
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("kafkaBinderTestCommonsDelegate");
assertThat(new String((byte[]) inbound.getPayload())).isEqualTo("testSendAndReceiveWithExplicitConsumerGroupWithRawMode");
Message<?> tapped1 = receive(module2InputChannel);
Message<?> tapped2 = receive(module3InputChannel);
@@ -1531,8 +1618,8 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
continue;
}
success = true;
assertThat(new String((byte[]) tapped1.getPayload())).isEqualTo("kafkaBinderTestCommonsDelegate");
assertThat(new String((byte[]) tapped2.getPayload())).isEqualTo("kafkaBinderTestCommonsDelegate");
assertThat(new String((byte[]) tapped1.getPayload())).isEqualTo("testSendAndReceiveWithExplicitConsumerGroupWithRawMode");
assertThat(new String((byte[]) tapped2.getPayload())).isEqualTo("testSendAndReceiveWithExplicitConsumerGroupWithRawMode");
}
// delete one tap stream is deleted
input3Binding.unbind();

View File

@@ -0,0 +1,47 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.bootstrap;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.test.rule.KafkaEmbedded;
/**
* @author Marius Bogoevici
*/
public class KafkaBinderBootstrapTest {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
@Test
public void testKafkaBinderConfiguration() throws Exception {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
.web(false)
.run("--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
applicationContext.close();
}
@SpringBootApplication
static class SimpleApplication {
}
}

View File

@@ -0,0 +1,7 @@
spring.kafka.producer.keySerializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.valueSerializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.consumer.keyDeserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.valueDeserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.producer.batchSize=10
spring.kafka.bootstrapServers=10.98.09.199:9092,10.98.09.196:9092
spring.kafka.producer.compressionType=snappy

View File

@@ -0,0 +1 @@
spring.cloud.stream.kafka.binder.brokers=10.98.09.199:9082