Compare commits

..

18 Commits

Author SHA1 Message Date
Artem Bilan
5269a6642c Prepare for release
* Upgrade to SCSt-1.3.0.RELEASE
* Revert to SC-build-1.3.5.RELEASE
2017-10-02 15:31:00 -04:00
Gary Russell
a6201425e0 GH-215: Add timeout to health indicator
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/215

* Shutdown the executor.

* Polishing - PR Comments

* Re-interrupt thread.

* More Polishing

# Conflicts:
#	spring-cloud-stream-binder-kafka11-docs/src/main/asciidoc/overview.adoc
2017-10-02 14:07:09 -04:00
Gary Russell
715b69506d Update to SK 1.3, SIK 2.3 RELEASE
Fixes spring-cloud/spring-cloud-stream-binder-kafka#220
2017-10-02 14:04:45 -04:00
Artem Bilan
4fb37e2eac Cast ProducerFactory to DisposableBean for destroy
https://jenkins.spring.io/blue/organizations/jenkins/spring-cloud-stream-binder-kafka-0.11-ci/detail/spring-cloud-stream-binder-kafka-0.11-ci/7/pipeline
2017-09-28 12:36:32 -04:00
Gary Russell
2a6d716ecf GH-206: Close Consumer/Producer in provisioning
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/206

Close the consumer and producer after retrieving the current partition count.

**Cherry pick/back port to 0.11 and 1.2.x, 2.0.x**

* Destroy the Producer Factory

# Conflicts:
#	spring-cloud-stream-binder-kafka11/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java
2017-09-28 11:04:01 -04:00
Soby Chacko
7344bed048 Accepting custom trusted packages
Kafka header property to accept trusted packages

Fix #195
2017-09-27 20:49:39 +01:00
Gary Russell
1e74d624c8 Back to 1.3.0.BUILD-SNAPSHOT 2017-09-13 13:52:39 -04:00
Gary Russell
7a2b9f1c56 Update POMs to 1.3.0.RC1; build 1.3.5.RELEASE 2017-09-13 12:37:17 -04:00
Soby Chacko
d72a4fdb33 Kstream binder: producer default Serde changes
Change the way the default Serde classes are selected for key and value
in producer when only one of those is provided by the user.

Fix #190
2017-09-13 12:28:05 -04:00
Soby Chacko
ea8b3062f1 GH-188: KStream Binder Properties
KStream binder: support class for application level properties

Provide commonly used KStream application properties for convenient access at runtime

Fix #188

Since windowing operations are common in KStream applications, making the TimeWindows object
avaiable as a first class bean (using auto configuration). This bean is only created if the
relevant properties are provided by the user.
2017-09-13 12:27:46 -04:00
Gary Russell
3b3896d642 Version Updates (SK, SIK, SI) 2017-09-11 17:25:54 -04:00
Gary Russell
3d244b156e Change Artifacts to ...kafka11 2017-08-29 18:32:28 -04:00
Gary Russell
e968194b1a KStreams and 0.11 2017-08-29 17:12:11 -04:00
Gary Russell
add9a5d1dc 0.11 Binder
Initial Commit

- Transactional Binder

Version Updates

- Headers support
2017-08-29 14:59:39 -04:00
Gary Russell
02913cd177 GH-169: Use the Actual Partition Count (Producer)
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/169

If the configured `partitionCount` is less than the physical partition count on an existing
topic, the binder emits this message:

    The `partitionCount` of the producer for topic partJ.0 is 3, smaller than the actual partition count of 8 of the topic.
    The larger number will be used instead.

However, that is not true; the configured partition count is used.

Override the configured partition count with the actual partition count.
2017-08-22 13:34:21 -04:00
Gary Russell
0865602141 GH-62: Remove Tuple Kryo Registrar Wrapper
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/62

No longer needed.
2017-08-22 13:24:51 -04:00
Gary Russell
790b141799 Fixes #181
SCSt-GH-916: Configure Producer Error Channel

Requires: https://github.com/spring-cloud/spring-cloud-stream/pull/1039

Publish send failures to the error channel.

Add docs

Revert to Spring Kafka 1.1.6
2017-08-22 12:56:16 -04:00
Vinicius Carvalho
60e620e36e Set version to 1.3.0.BUILD-SNAPSHOT 2017-07-31 16:00:00 -04:00
120 changed files with 772 additions and 874 deletions

39
pom.xml
View File

@@ -1,43 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.0.RELEASE</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.3.3.RELEASE</version>
<version>1.3.5.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.7</java.version>
<kafka.version>0.10.1.1</kafka.version>
<spring-kafka.version>1.1.6.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.1.1.RELEASE</spring-integration-kafka.version>
<spring-cloud-stream.version>1.3.0.M2</spring-cloud-stream.version>
<spring-cloud-build.version>1.3.3.RELEASE</spring-cloud-build.version>
<kafka.version>0.11.0.0</kafka.version>
<spring-kafka.version>1.3.0.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.3.0.RELEASE</spring-integration-kafka.version>
<spring-cloud-stream.version>1.3.0.RELEASE</spring-cloud-stream.version>
<spring-cloud-build.version>1.3.5.RELEASE</spring-cloud-build.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>
<module>spring-cloud-starter-stream-kafka</module>
<module>spring-cloud-stream-binder-kafka-docs</module>
<module>spring-cloud-stream-binder-kafka-0.10.1-test</module>
<module>spring-cloud-stream-binder-kafka-0.10.2-test</module>
<module>spring-cloud-stream-binder-kafka-core</module>
<module>spring-cloud-stream-binder-kstream</module>
<module>spring-cloud-stream-binder-kafka11</module>
<module>spring-cloud-starter-stream-kafka11</module>
<module>spring-cloud-stream-binder-kafka11-docs</module>
<module>spring-cloud-stream-binder-kafka-0.11-test</module>
<module>spring-cloud-stream-binder-kafka11-core</module>
<module>spring-cloud-stream-binder-kstream11</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -83,6 +82,12 @@
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>${spring-integration-kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>

View File

@@ -3,11 +3,11 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.0.RELEASE</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>
<artifactId>spring-cloud-starter-stream-kafka11</artifactId>
<description>Spring Cloud Starter Stream Kafka for the 0.11.x.x client</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>

View File

@@ -1,126 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.1-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.1 Tests</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>http://www.spring.io</url>
</organization>
<properties>
<main.basedir>${basedir}/../..</main.basedir>
<kafka.version>0.10.1.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-0.10.2-test</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>${spring-cloud-stream.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,241 +0,0 @@
/*
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.assertj.core.api.Assertions;
import org.eclipse.jetty.server.Server;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import static org.junit.Assert.assertTrue;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
* This test specifically tests for the 0.10.1.x version of Kafka.
*
* @author Eric Bottard
* @author Marius Bogoevici
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
*/
public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
private Kafka10TestBinder binder;
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
Thread.sleep(500);
}
@Override
protected Kafka10TestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binder = new Kafka10TestBinder(binderConfiguration);
}
return binder;
}
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
List<String> bAddresses = new ArrayList<>();
for (BrokerAddress bAddress : brokerAddresses) {
bAddresses.add(bAddress.toString());
}
String[] foo = new String[bAddresses.size()];
binderConfiguration.setBrokers(bAddresses.toArray(foo));
binderConfiguration.setZkNodes(embeddedKafka.getZookeeperConnectionString());
return binderConfiguration;
}
@Override
protected int partitionSize(String topic) {
return consumerFactory().createConsumer().partitionsFor(topic).size();
}
@Override
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
kafkaBinderConfigurationProperties.getZkSessionTimeout(), kafkaBinderConfigurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
return new ZkUtils(zkClient, null, false);
}
@Override
protected void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Properties topicConfig) {
adminUtilsOperation.invokeCreateTopic(zkUtils, topic, partitions, replicationFactor, new Properties());
}
@Override
protected int invokePartitionSize(String topic, ZkUtils zkUtils) {
return adminUtilsOperation.partitionSize(topic, zkUtils);
}
@Override
public String getKafkaOffsetHeaderKey() {
return KafkaHeaders.OFFSET;
}
@Override
protected Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
return new Kafka10TestBinder(kafkaBinderConfigurationProperties);
}
@Before
public void init() {
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
if (multiplier != null) {
timeoutMultiplier = Double.parseDouble(multiplier);
}
}
@Override
protected boolean usesExplicitRouting() {
return false;
}
@Override
protected String getClassUnderTestName() {
return CLASS_UNDER_TEST_NAME;
}
@Override
public Spy spyOn(final String name) {
throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
}
private ConsumerFactory<byte[], byte[]> consumerFactory() {
Map<String, Object> props = new HashMap<>();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST-CONSUMER-GROUP");
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
}
@Test
@SuppressWarnings("unchecked")
public void testCustomAvroSerialization() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
Map<String, Object> schemaRegistryProps = new HashMap<>();
schemaRegistryProps.put("kafkastore.connection.url", configurationProperties.getZkConnectionString());
schemaRegistryProps.put("listeners", "http://0.0.0.0:8082");
schemaRegistryProps.put("port", "8082");
schemaRegistryProps.put("kafkastore.topic", "_schemas");
SchemaRegistryConfig config = new SchemaRegistryConfig(schemaRegistryProps);
SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
Server server = app.createServer();
server.start();
long endTime = System.currentTimeMillis() + 5000;
while(true) {
if (server.isRunning()) {
break;
}
else if (System.currentTimeMillis() > endTime) {
Assertions.fail("Kafka Schema Registry Server failed to start");
}
}
User1 firstOutboundFoo = new User1();
String userName1 = "foo-name" + UUID.randomUUID().toString();
String favColor1 = "foo-color" + UUID.randomUUID().toString();
firstOutboundFoo.setName(userName1);
firstOutboundFoo.setFavoriteColor(favColor1);
Message<?> message = MessageBuilder.withPayload(firstOutboundFoo).build();
SubscribableChannel moduleOutputChannel = new DirectChannel();
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.getExtension().getConfiguration().put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
producerProperties.setUseNativeEncoding(true);
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
consumerProperties.getExtension().getConfiguration().put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
consumerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
Binding<MessageChannel> consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
Assertions.assertThat(inbound).isNotNull();
assertTrue(message.getPayload() instanceof User1);
User1 receivedUser = (User1) message.getPayload();
Assertions.assertThat(receivedUser.getName()).isEqualTo(userName1);
Assertions.assertThat(receivedUser.getFavoriteColor()).isEqualTo(favColor1);
producerBinding.unbind();
consumerBinding.unbind();
}
}

View File

@@ -3,11 +3,11 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.0.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.2-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.2 Tests</description>
<artifactId>spring-cloud-stream-binder-kafka-0.11-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.11 Tests</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
@@ -15,20 +15,18 @@
</organization>
<properties>
<main.basedir>${basedir}/../..</main.basedir>
<kafka.version>0.10.2.1</kafka.version>
<spring-kafka.version>1.2.2.RELEASE</spring-kafka.version>
<kafka.version>0.11.0.0</kafka.version>
<spring-kafka.version>1.3.0.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>${project.version}</version>
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -63,7 +61,7 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>

View File

@@ -67,6 +67,7 @@ public class Kafka10TestBinder extends AbstractKafkaTestBinder {
ProducerListener producerListener = new LoggingProducerListener();
binder.setProducerListener(producerListener);
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
setApplicationContext(context);
binder.setApplicationContext(context);
binder.afterPropertiesSet();
this.setBinder(binder);

View File

@@ -25,8 +25,10 @@ import java.util.UUID;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -38,6 +40,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
@@ -46,6 +49,7 @@ import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOper
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.core.ConsumerFactory;
@@ -55,22 +59,25 @@ import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import static org.junit.Assert.assertTrue;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
* This test specifically tests for the 0.10.2.x version of Kafka.
* This test specifically tests for the 0.11.x.x version of Kafka.
*
* @author Eric Bottard
* @author Marius Bogoevici
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
*/
public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
public class Kafka_0_11_BinderTests extends KafkaBinderTests {
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
@@ -79,7 +86,7 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
private Kafka10TestBinder binder;
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
private final Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
@@ -95,6 +102,7 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
return binder;
}
@Override
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
@@ -178,6 +186,42 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testTrustedPackages() throws Exception {
Binder binder = getBinder();
BindingProperties producerBindingProperties = createProducerBindingProperties(createProducerProperties());
DirectChannel moduleOutputChannel = createBindableChannel("output", producerBindingProperties);
QueueChannel moduleInputChannel = new QueueChannel();
Binding<MessageChannel> producerBinding = binder.bindProducer("bar.0", moduleOutputChannel,
producerBindingProperties.getProducer());
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setTrustedPackages(new String[]{"org.springframework.util"});
Binding<MessageChannel> consumerBinding = binder.bindConsumer("bar.0",
"testSendAndReceiveNoOriginalContentType", moduleInputChannel, consumerProperties);
binderBindUnbindLatency();
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("foo")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).setHeader("foo", MimeTypeUtils.TEXT_PLAIN)
.build();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
Assertions.assertThat(inbound).isNotNull();
Assertions.assertThat(inbound.getPayload()).isEqualTo("foo");
Assertions.assertThat(inbound.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull();
Assertions.assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.TEXT_PLAIN_VALUE);
Assertions.assertThat(inbound.getHeaders().get("foo")).isInstanceOf(MimeType.class);
MimeType actual = (MimeType) inbound.getHeaders().get("foo");
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN);
producerBinding.unbind();
consumerBinding.unbind();
}
class Foo{}
@Test
@SuppressWarnings("unchecked")
public void testCustomAvroSerialization() throws Exception {
@@ -238,4 +282,15 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
producerBinding.unbind();
consumerBinding.unbind();
}
@Override
public void testSendAndReceiveWithExplicitConsumerGroupWithRawMode() {
// raw mode no longer needed
}
@Override
public void testSendAndReceiveWithRawModeAndStringPayload() {
// raw mode no longer needed
}
}

View File

@@ -1,74 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.kafka.core.ConsumerFactory;
/**
* Health indicator for Kafka.
*
* @author Ilayaperumal Gopinathan
* @author Marius Bogoevici
* @author Henryk Konsek
*/
public class KafkaBinderHealthIndicator implements HealthIndicator {
private final KafkaMessageChannelBinder binder;
private final ConsumerFactory<?, ?> consumerFactory;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
ConsumerFactory<?, ?> consumerFactory) {
this.binder = binder;
this.consumerFactory = consumerFactory;
}
@Override
public Health health() {
try (Consumer<?, ?> metadataConsumer = consumerFactory.createConsumer()) {
Set<String> downMessages = new HashSet<>();
for (String topic : this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (this.binder.getTopicsInUse().get(topic).getPartitionInfos().contains(partitionInfo)
&& partitionInfo.leader()
.id() == -1) {
downMessages.add(partitionInfo.toString());
}
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
return Health.down().withDetail("Following partitions in use have no leaders: ", downMessages.toString())
.build();
}
catch (Exception e) {
return Health.down(e).build();
}
}
}

View File

@@ -1,118 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import org.apache.kafka.common.security.JaasUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.util.Assert;
/**
* @author Marius Bogoevici
*/
public class KafkaBinderJaasInitializerListener implements ApplicationListener<ContextRefreshedEvent>,
ApplicationContextAware, DisposableBean {
public static final String DEFAULT_ZK_LOGIN_CONTEXT_NAME = "Client";
private ApplicationContext applicationContext;
private final boolean ignoreJavaLoginConfigParamSystemProperty;
private final File placeholderJaasConfiguration;
public KafkaBinderJaasInitializerListener() throws IOException {
// we ignore the system property if it wasn't originally set at launch
this.ignoreJavaLoginConfigParamSystemProperty =
(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) == null);
this.placeholderJaasConfiguration = File.createTempFile("kafka-client-jaas-config-placeholder", "conf");
this.placeholderJaasConfiguration.deleteOnExit();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void destroy() throws Exception {
if (this.ignoreJavaLoginConfigParamSystemProperty) {
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
}
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getSource() == this.applicationContext) {
KafkaBinderConfigurationProperties binderConfigurationProperties =
applicationContext.getBean(KafkaBinderConfigurationProperties.class);
// only use programmatic support if a file is not set via system property
if (ignoreJavaLoginConfigParamSystemProperty
&& binderConfigurationProperties.getJaas() != null) {
Map<String, AppConfigurationEntry[]> configurationEntries = new HashMap<>();
AppConfigurationEntry kafkaClientConfigurationEntry = new AppConfigurationEntry
(binderConfigurationProperties.getJaas().getLoginModule(),
binderConfigurationProperties.getJaas().getControlFlagValue(),
binderConfigurationProperties.getJaas().getOptions() != null ?
binderConfigurationProperties.getJaas().getOptions() :
Collections.<String, Object>emptyMap());
configurationEntries.put(JaasUtils.LOGIN_CONTEXT_CLIENT,
new AppConfigurationEntry[]{ kafkaClientConfigurationEntry });
Configuration.setConfiguration(new InternalConfiguration(configurationEntries));
// Workaround for a 0.9 client issue where even if the Configuration is set
// a system property check is performed.
// Since the Configuration already exists, this will be ignored.
if (this.placeholderJaasConfiguration != null) {
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, this.placeholderJaasConfiguration.getAbsolutePath());
}
}
}
}
/**
* A {@link Configuration} set up programmatically by the Kafka binder
*/
public static class InternalConfiguration extends Configuration {
private final Map<String, AppConfigurationEntry[]> configurationEntries;
public InternalConfiguration(Map<String, AppConfigurationEntry[]> configurationEntries) {
Assert.notNull(configurationEntries, " cannot be null");
Assert.notEmpty(configurationEntries, " cannot be empty");
this.configurationEntries = configurationEntries;
}
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
return configurationEntries.get(name);
}
}
}

View File

@@ -1,61 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import javax.security.auth.login.AppConfigurationEntry;
import com.sun.security.auth.login.ConfigFile;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.io.ClassPathResource;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
*/
public class KafkaBinderJaasInitializerListenerTest {
@Test
public void testConfigurationParsedCorrectlyWithKafkaClient() throws Exception {
ConfigFile configFile = new ConfigFile(new ClassPathResource("jaas-sample-kafka-only.conf").getURI());
final AppConfigurationEntry[] kafkaConfigurationArray = configFile.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_CLIENT);
final ConfigurableApplicationContext context =
SpringApplication.run(SimpleApplication.class,
"--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true",
"--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true",
"--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab",
"--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM");
javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration.getConfiguration();
final AppConfigurationEntry[] kafkaConfiguration = configuration.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_CLIENT);
assertThat(kafkaConfiguration).hasSize(1);
assertThat(kafkaConfiguration[0].getOptions()).isEqualTo(kafkaConfigurationArray[0].getOptions());
context.close();
}
@SpringBootApplication
public static class SimpleApplication {
}
}

View File

@@ -3,11 +3,11 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.0.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core for the 0.11.x.x client</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
@@ -33,13 +33,6 @@
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>${spring-integration-kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,10 +35,13 @@ import org.springframework.util.StringUtils;
* @author Ilayaperumal Gopinathan
* @author Marius Bogoevici
* @author Soby Chacko
* @author Gary Russell
*/
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder")
public class KafkaBinderConfigurationProperties {
private final Transaction transaction = new Transaction();
@Autowired(required = false)
private KafkaProperties kafkaProperties;
@@ -88,8 +91,17 @@ public class KafkaBinderConfigurationProperties {
private int queueSize = 8192;
/**
* Time to wait to get partition information in seconds; default 60.
*/
private int healthTimeout = 60;
private JaasLoginModuleConfiguration jaas;
public Transaction getTransaction() {
return this.transaction;
}
public String getZkConnectionString() {
return toConnectionString(this.zkNodes, this.defaultZkPort);
}
@@ -228,6 +240,14 @@ public class KafkaBinderConfigurationProperties {
this.minPartitionCount = minPartitionCount;
}
public int getHealthTimeout() {
return this.healthTimeout;
}
public void setHealthTimeout(int healthTimeout) {
this.healthTimeout = healthTimeout;
}
public int getQueueSize() {
return this.queueSize;
}
@@ -338,4 +358,24 @@ public class KafkaBinderConfigurationProperties {
this.jaas = jaas;
}
public static class Transaction {
private final KafkaProducerProperties producer = new KafkaProducerProperties();
private String transactionIdPrefix;
public String getTransactionIdPrefix() {
return this.transactionIdPrefix;
}
public void setTransactionIdPrefix(String transactionIdPrefix) {
this.transactionIdPrefix = transactionIdPrefix;
}
public KafkaProducerProperties getProducer() {
return this.producer;
}
}
}

View File

@@ -43,6 +43,8 @@ public class KafkaConsumerProperties {
private int recoveryInterval = 5000;
private String[] trustedPackages;
private Map<String, String> configuration = new HashMap<>();
public boolean isAutoCommitOffset() {
@@ -123,4 +125,12 @@ public class KafkaConsumerProperties {
public void setDlqName(String dlqName) {
this.dlqName = dlqName;
}
public String[] getTrustedPackages() {
return trustedPackages;
}
public void setTrustedPackages(String[] trustedPackages) {
this.trustedPackages = trustedPackages;
}
}

View File

@@ -16,16 +16,17 @@
package org.springframework.cloud.stream.binder.kafka.properties;
import org.springframework.expression.Expression;
import java.util.HashMap;
import java.util.Map;
import javax.validation.constraints.NotNull;
import org.springframework.expression.Expression;
/**
* @author Marius Bogoevici
* @author Henryk Konsek
* @author Gary Russell
*/
public class KafkaProducerProperties {
@@ -39,6 +40,8 @@ public class KafkaProducerProperties {
private Expression messageKeyExpression;
private String[] headerPatterns;
private Map<String, String> configuration = new HashMap<>();
public int getBufferSize() {
@@ -82,6 +85,14 @@ public class KafkaProducerProperties {
this.messageKeyExpression = messageKeyExpression;
}
public String[] getHeaderPatterns() {
return this.headerPatterns;
}
public void setHeaderPatterns(String[] headerPatterns) {
this.headerPatterns = headerPatterns;
}
public Map<String, String> getConfiguration() {
return this.configuration;
}

View File

@@ -4,13 +4,13 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.0.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
<name>spring-cloud-stream-binder-kafka-docs</name>
<description>Spring Cloud Stream Kafka Binder Docs</description>
<artifactId>spring-cloud-stream-binder-kafka11-docs</artifactId>
<name>spring-cloud-stream-binder-kafka11-docs</name>
<description>Spring Cloud Stream Kafka Binder Docs for the 0.11.x.x client</description>
<properties>
<main.basedir>${basedir}/..</main.basedir>
</properties>

View File

@@ -23,12 +23,15 @@ Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinat
= Reference Guide
include::overview.adoc[]
include::dlq.adoc[]
include::metrics.adoc[]
= Appendices
[appendix]
include::building.adoc[]
include::contributing.adoc[]
// ======================================================================================

View File

@@ -73,6 +73,11 @@ spring.cloud.stream.kafka.binder.headers::
The list of custom headers that will be transported by the binder.
+
Default: empty.
spring.cloud.stream.kafka.binder.healthTimeout::
The time to wait to get partition information in seconds; default 60.
Health will report as down if this timer expires.
+
Default: 10.
spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow::
The frequency, in milliseconds, with which offsets are saved.
Ignored if `0`.
@@ -523,5 +528,15 @@ spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.c
spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde
----
[[kafka-error-channels]]
== Error Channels
Starting with _version 1.3_, the binder unconditionally sends exceptions to an error channel for each consumer destination, and can be configured to send async producer send failures to an error channel too.
See <<binder-error-channels>> for more information.
The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureException` with properties:
* `failedMessage` - the spring-messaging `Message<?>` that failed to be sent.
* `record` - the raw `ProducerRecord` that was created from the `failedMessage`
There is no automatic handling of these exceptions (such as sending to a <<kafka-dlq-processing, Dead-Letter queue>>); you can consume these exceptions with your own Spring Integration flow.

View File

@@ -2,21 +2,21 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-binder-kafka</name>
<description>Kafka binder implementation</description>
<name>spring-cloud-stream-binder-kafka11</name>
<description>Kafka binder implementation for the 0.11.x.x client</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -54,10 +54,6 @@
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
@@ -66,6 +62,7 @@
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@@ -0,0 +1,125 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.kafka.core.ConsumerFactory;
/**
* Health indicator for Kafka.
*
* @author Ilayaperumal Gopinathan
* @author Marius Bogoevici
* @author Henryk Konsek
* @author Gary Russell
*/
public class KafkaBinderHealthIndicator implements HealthIndicator {
private static final int DEFAULT_TIMEOUT = 60;
private final KafkaMessageChannelBinder binder;
private final ConsumerFactory<?, ?> consumerFactory;
private int timeout = DEFAULT_TIMEOUT;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
ConsumerFactory<?, ?> consumerFactory) {
this.binder = binder;
this.consumerFactory = consumerFactory;
}
/**
* Set the timeout in seconds to retrieve health information.
* @param timeout the timeout - default 60.
*/
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public Health health() {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Health> future = exec.submit(new Callable<Health>() {
@Override
public Health call() {
try (Consumer<?, ?> metadataConsumer = consumerFactory.createConsumer()) {
Set<String> downMessages = new HashSet<>();
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
downMessages.add(partitionInfo.toString());
}
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
else {
return Health.down()
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
.build();
}
}
catch (Exception e) {
return Health.down(e).build();
}
}
});
try {
return future.get(this.timeout, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Health.down()
.withDetail("Interrupted while waiting for partition information in", this.timeout + " seconds")
.build();
}
catch (ExecutionException e) {
return Health.down(e).build();
}
catch (TimeoutException e) {
return Health.down()
.withDetail("Failed to retrieve partition information in", this.timeout + " seconds")
.build();
}
finally {
exec.shutdownNow();
}
}
}

View File

@@ -21,18 +21,24 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderHeaders;
@@ -58,15 +64,20 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
@@ -97,30 +108,24 @@ public class KafkaMessageChannelBinder extends
private final Map<String, TopicInformation> topicsInUse = new HashMap<>();
private final KafkaTransactionManager<byte[], byte[]> transactionManager;
private ProducerListener<byte[], byte[]> producerListener;
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
super(false, headersToMap(configurationProperties), provisioningProvider);
super(true, null, provisioningProvider);
this.configurationProperties = configurationProperties;
}
private static String[] headersToMap(KafkaBinderConfigurationProperties configurationProperties) {
String[] headersToMap;
if (ObjectUtils.isEmpty(configurationProperties.getHeaders())) {
headersToMap = BinderHeaders.STANDARD_HEADERS;
if (StringUtils.hasText(configurationProperties.getTransaction().getTransactionIdPrefix())) {
this.transactionManager = new KafkaTransactionManager<>(
getProducerFactory(configurationProperties.getTransaction().getTransactionIdPrefix(),
new ExtendedProducerProperties<>(configurationProperties.getTransaction().getProducer())));
}
else {
String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0,
BinderHeaders.STANDARD_HEADERS.length + configurationProperties.getHeaders().length);
System.arraycopy(configurationProperties.getHeaders(), 0, combinedHeadersToMap,
BinderHeaders.STANDARD_HEADERS.length,
configurationProperties.getHeaders().length);
headersToMap = combinedHeadersToMap;
this.transactionManager = null;
}
return headersToMap;
}
public void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) {
@@ -147,16 +152,32 @@ public class KafkaMessageChannelBinder extends
@Override
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
final DefaultKafkaProducerFactory<byte[], byte[]> producerFB = getProducerFactory(producerProperties);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel errorChannel)
throws Exception {
/*
* IMPORTANT: With a transactional binder, individual producer properties for
* Kafka are ignored; the global binder
* (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
* instead, for all producers. A binder is transactional when
* 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
*/
final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
? this.transactionManager.getProducerFactory()
: getProducerFactory(null, producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
producerProperties.getPartitionCount(),
false,
new Callable<Collection<PartitionInfo>>() {
@Override
public Collection<PartitionInfo> call() throws Exception {
return producerFB.createProducer().partitionsFor(destination.getName());
Producer<byte[], byte[]> producer = producerFB.createProducer();
List<PartitionInfo> partitionsFor = producer.partitionsFor(destination.getName());
producer.close();
((DisposableBean) producerFB).destroy();
return partitionsFor;
}
});
this.topicsInUse.put(destination.getName(), new TopicInformation(null, partitions));
if (producerProperties.getPartitionCount() < partitions.size()) {
@@ -165,17 +186,41 @@ public class KafkaMessageChannelBinder extends
+ producerProperties.getPartitionCount() + ", smaller than the actual partition count of "
+ partitions.size() + " of the topic. The larger number will be used instead.");
}
/*
* This is dirty; it relies on the fact that we, and the partition
* interceptor, share a hard reference to the producer properties instance.
* But I don't see another way to fix it since the interceptor has already
* been added to the channel, and we don't have access to the channel here; if
* we did, we could inject the proper partition count there.
* TODO: Consider this when doing the 2.0 binder restructuring.
*/
producerProperties.setPartitionCount(partitions.size());
}
KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFB);
if (this.producerListener != null) {
kafkaTemplate.setProducerListener(this.producerListener);
}
return new ProducerConfigurationMessageHandler(kafkaTemplate, destination.getName(), producerProperties,
producerFB);
ProducerConfigurationMessageHandler handler = new ProducerConfigurationMessageHandler(kafkaTemplate,
destination.getName(), producerProperties, producerFB);
if (errorChannel != null) {
handler.setSendFailureChannel(errorChannel);
}
String[] headerPatterns = producerProperties.getExtension().getHeaderPatterns();
if (headerPatterns != null && headerPatterns.length > 0) {
List<String> patterns = new LinkedList<>(Arrays.asList(headerPatterns));
if (!patterns.contains("!" + MessageHeaders.TIMESTAMP)) {
patterns.add(0, "!" + MessageHeaders.TIMESTAMP);
}
if (!patterns.contains("!" + MessageHeaders.ID)) {
patterns.add(0, "!" + MessageHeaders.ID);
}
handler.setHeaderMapper(new DefaultKafkaHeaderMapper(patterns.toArray(new String[patterns.size()])));
}
return handler;
}
private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(String transactionIdPrefix,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.RETRIES_CONFIG, 0);
@@ -204,7 +249,11 @@ public class KafkaMessageChannelBinder extends
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
}
return new DefaultKafkaProducerFactory<>(props);
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = new DefaultKafkaProducerFactory<>(props);
if (transactionIdPrefix != null) {
producerFactory.setTransactionIdPrefix(transactionIdPrefix);
}
return producerFactory;
}
@Override
@@ -224,10 +273,15 @@ public class KafkaMessageChannelBinder extends
Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
new Callable<Collection<PartitionInfo>>() {
@Override
public Collection<PartitionInfo> call() throws Exception {
return consumerFactory.createConsumer().partitionsFor(destination.getName());
Consumer<?, ?> consumer = consumerFactory.createConsumer();
List<PartitionInfo> partitionsFor = consumer.partitionsFor(destination.getName());
consumer.close();
return partitionsFor;
}
});
Collection<PartitionInfo> listenedPartitions;
@@ -256,6 +310,9 @@ public class KafkaMessageChannelBinder extends
|| extendedConsumerProperties.getExtension().isAutoRebalanceEnabled()
? new ContainerProperties(destination.getName())
: new ContainerProperties(topicPartitionInitialOffsets);
if (this.transactionManager != null) {
containerProperties.setTransactionManager(this.transactionManager);
}
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
@SuppressWarnings("rawtypes")
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
@@ -284,6 +341,15 @@ public class KafkaMessageChannelBinder extends
}
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(
messageListenerContainer);
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
String[] trustedPackages = extendedConsumerProperties.getExtension().getTrustedPackages();
if (!StringUtils.isEmpty(trustedPackages)) {
headerMapper.addTrustedPackages(trustedPackages);
}
messageConverter.setHeaderMapper(headerMapper);
kafkaMessageDrivenChannelAdapter.setMessageConverter(messageConverter);
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, consumerGroup,
extendedConsumerProperties);
@@ -306,8 +372,9 @@ public class KafkaMessageChannelBinder extends
protected MessageHandler getErrorMessageHandler(final ConsumerDestination destination, final String group,
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(
new ExtendedProducerProperties<>(new KafkaProducerProperties()));
ProducerFactory<byte[], byte[]> producerFactory = this.transactionManager != null
? this.transactionManager.getProducerFactory()
: getProducerFactory(null, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
final KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
return new MessageHandler() {
@@ -322,8 +389,9 @@ public class KafkaMessageChannelBinder extends
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
? extendedConsumerProperties.getExtension().getDlqName()
: "error." + destination.getName() + "." + group;
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(dlqName,
record.partition(), key, payload);
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(dlqName, record.partition(),
key, payload, record.headers());
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(producerRecord);
sentDlq.addCallback(new ListenableFutureCallback<SendResult<byte[], byte[]>>() {
StringBuilder sb = new StringBuilder().append(" a message with key='")
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'")
@@ -411,11 +479,11 @@ public class KafkaMessageChannelBinder extends
private boolean running = true;
private final DefaultKafkaProducerFactory<byte[], byte[]> producerFactory;
private final ProducerFactory<byte[], byte[]> producerFactory;
private ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
ProducerFactory<byte[], byte[]> producerFactory) {
super(kafkaTemplate);
setTopicExpression(new LiteralExpression(topic));
setMessageKeyExpression(producerProperties.getExtension().getMessageKeyExpression());
@@ -443,7 +511,9 @@ public class KafkaMessageChannelBinder extends
@Override
public void stop() {
producerFactory.stop();
if (this.producerFactory instanceof Lifecycle) {
((Lifecycle) producerFactory).stop();
}
this.running = false;
}

View File

@@ -34,7 +34,6 @@ import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoCon
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderJaasInitializerListener;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
@@ -46,7 +45,6 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBin
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.config.codec.kryo.KryoCodecAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
@@ -57,6 +55,7 @@ import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.integration.codec.Codec;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.util.ObjectUtils;
@@ -68,6 +67,7 @@ import org.springframework.util.ObjectUtils;
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
* @author Henryk Konsek
* @author Gary Russell
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@@ -128,7 +128,10 @@ public class KafkaBinderConfiguration {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
consumerFactory);
indicator.setTimeout(this.configurationProperties.getHealthTimeout());
return indicator;
}
@Bean
@@ -145,7 +148,7 @@ public class KafkaBinderConfiguration {
}
@Bean(name = "adminUtilsOperation")
@Conditional(Kafka10Present.class)
@Conditional(Kafka1xPresent.class)
@ConditionalOnClass(name = "kafka.admin.AdminUtils")
public AdminUtilsOperation kafka10AdminUtilsOperation() {
logger.info("AdminUtils selected: Kafka 0.10 AdminUtils");
@@ -153,15 +156,15 @@ public class KafkaBinderConfiguration {
}
@Bean
public ApplicationListener<?> jaasInitializer() throws IOException {
return new KafkaBinderJaasInitializerListener();
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
return new KafkaJaasLoginModuleInitializer();
}
static class Kafka10Present implements Condition {
static class Kafka1xPresent implements Condition {
@Override
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
return AppInfoParser.getVersion().startsWith("0.10");
return AppInfoParser.getVersion().startsWith("0.1");
}
}

View File

@@ -15,24 +15,16 @@
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.List;
import org.springframework.cloud.stream.binder.AbstractTestBinder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.codec.Codec;
import org.springframework.integration.codec.kryo.KryoRegistrar;
import org.springframework.integration.codec.kryo.PojoCodec;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.tuple.TupleKryoRegistrar;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Registration;
/**
* @author Soby Chacko
* @author Gary Russell
@@ -40,33 +32,23 @@ import com.esotericsoftware.kryo.Registration;
public abstract class AbstractKafkaTestBinder extends
AbstractTestBinder<KafkaMessageChannelBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
private ApplicationContext applicationContext;
@Override
public void cleanup() {
// do nothing - the rule will take care of that
}
protected void addErrorChannel(GenericApplicationContext context) {
PublishSubscribeChannel errorChannel = new PublishSubscribeChannel();
context.getBeanFactory().initializeBean(errorChannel, IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME);
context.getBeanFactory().registerSingleton(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, errorChannel);
protected final void setApplicationContext(ApplicationContext context) {
this.applicationContext = context;
}
public ApplicationContext getApplicationContext() {
return this.applicationContext;
}
protected static Codec getCodec() {
return new PojoCodec(new TupleRegistrar());
}
private static class TupleRegistrar implements KryoRegistrar {
private final TupleKryoRegistrar delegate = new TupleKryoRegistrar();
@Override
public void registerTypes(Kryo kryo) {
this.delegate.registerTypes(kryo);
}
@Override
public List<Registration> getRegistrations() {
return this.delegate.getRegistrations();
}
return new PojoCodec(new TupleKryoRegistrar());
}
}

View File

@@ -66,10 +66,10 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
ExtendedProducerProperties.class);
String.class, ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, producerProperties);
.invoke(this.kafkaMessageChannelBinder, "foo", producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);

View File

@@ -63,10 +63,10 @@ public class KafkaBinderConfigurationPropertiesTest {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
kafkaProducerProperties);
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
ExtendedProducerProperties.class);
String.class, ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, producerProperties);
.invoke(this.kafkaMessageChannelBinder, "bar", producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);

View File

@@ -15,6 +15,10 @@
*/
package org.springframework.cloud.stream.binder.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -27,16 +31,16 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
/**
* @author Barry Commins
* @author Gary Russell
*/
public class KafkaBinderHealthIndicatorTest {
@@ -53,14 +57,15 @@ public class KafkaBinderHealthIndicatorTest {
@Mock
private KafkaMessageChannelBinder binder;
private Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = new HashMap<>();
private final Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = new HashMap<>();
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
given(consumerFactory.createConsumer()).willReturn(consumer);
given(binder.getTopicsInUse()).willReturn(topicsInUse);
indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
this.indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
this.indicator.setTimeout(10);
}
@Test
@@ -70,6 +75,7 @@ public class KafkaBinderHealthIndicatorTest {
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
verify(this.consumer).close();
}
@Test
@@ -81,6 +87,25 @@ public class KafkaBinderHealthIndicatorTest {
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
@Test(timeout = 5000)
public void kafkaBinderDoesNotAnswer() {
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
given(consumer.partitionsFor(TEST_TOPIC)).willAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final int fiveMinutes = 1000 * 60 * 5;
Thread.sleep(fiveMinutes);
return partitions;
}
});
this.indicator.setTimeout(1);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
private List<PartitionInfo> partitions(Node leader) {
List<PartitionInfo> partitions = new ArrayList<>();
partitions.add(new PartitionInfo(TEST_TOPIC, 0, leader, null, null));

View File

@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -58,27 +59,37 @@ import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.kafka.support.KafkaSendFailureException;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
@@ -87,6 +98,7 @@ import kafka.utils.ZkUtils;
* @author Soby Chacko
* @author Ilayaperumal Gopinathan
* @author Henryk Konsek
* @author Gary Russell
*/
public abstract class KafkaBinderTests extends
PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
@@ -138,8 +150,7 @@ public abstract class KafkaBinderTests extends
testDlqGuts(false);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testDlqGuts(boolean withRetry) throws Exception {
private void testDlqGuts(boolean withRetry) throws Exception {
AbstractKafkaTestBinder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
DirectChannel moduleInputChannel = new DirectChannel();
@@ -148,6 +159,7 @@ public abstract class KafkaBinderTests extends
moduleInputChannel.subscribe(handler);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(2);
producerProperties.getExtension().setHeaderPatterns(new String[] { MessageHeaders.CONTENT_TYPE });
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(withRetry ? 2 : 1);
consumerProperties.setBackOffInitialInterval(100);
@@ -167,7 +179,6 @@ public abstract class KafkaBinderTests extends
ApplicationContext context = TestUtils.getPropertyValue(binder.getBinder(), "applicationContext",
ApplicationContext.class);
Map<String, MessageChannel> beansOfType = context.getBeansOfType(MessageChannel.class);
SubscribableChannel boundErrorChannel = context
.getBean(producerName + ".testGroup.errors-0", SubscribableChannel.class);
SubscribableChannel globalErrorChannel = context.getBean("errorChannel", SubscribableChannel.class);
@@ -967,13 +978,23 @@ public abstract class KafkaBinderTests extends
@Test
@Override
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testPartitionedModuleJava() throws Exception {
Binder binder = getBinder();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient;
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
invokeCreateTopic(zkUtils, "partJ.0", 8, 1, new Properties());
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setConcurrency(2);
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceCount(4);
consumerProperties.setInstanceIndex(0);
consumerProperties.setPartitioned(true);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
@@ -988,11 +1009,15 @@ public abstract class KafkaBinderTests extends
QueueChannel input2 = new QueueChannel();
input2.setBeanName("test.input2J");
Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.0", "test", input2, consumerProperties);
consumerProperties.setInstanceIndex(3);
QueueChannel input3 = new QueueChannel();
input3.setBeanName("test.input3J");
Binding<MessageChannel> input3Binding = binder.bindConsumer("partJ.0", "test", input3, consumerProperties);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionKeyExtractorClass(PartitionTestSupport.class);
producerProperties.setPartitionSelectorClass(PartitionTestSupport.class);
producerProperties.setPartitionCount(3);
producerProperties.setPartitionCount(3); // overridden to 8 on the actual topic
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties));
output.setBeanName("test.output");
Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output, producerProperties);
@@ -1005,6 +1030,7 @@ public abstract class KafkaBinderTests extends
output.send(new GenericMessage<>(2));
output.send(new GenericMessage<>(1));
output.send(new GenericMessage<>(0));
output.send(new GenericMessage<>(3));
Message<?> receive0 = receive(input0);
assertThat(receive0).isNotNull();
@@ -1012,20 +1038,18 @@ public abstract class KafkaBinderTests extends
assertThat(receive1).isNotNull();
Message<?> receive2 = receive(input2);
assertThat(receive2).isNotNull();
Message<?> receive3 = receive(input3);
assertThat(receive3).isNotNull();
if (usesExplicitRouting()) {
assertThat(receive0.getPayload()).isEqualTo(0);
assertThat(receive1.getPayload()).isEqualTo(1);
assertThat(receive2.getPayload()).isEqualTo(2);
}
else {
List<Message<?>> receivedMessages = Arrays.asList(receive0, receive1, receive2);
assertThat(receivedMessages).extracting("payload").containsExactlyInAnyOrder(0, 1, 2);
}
assertThat(receive0.getPayload()).isEqualTo(0);
assertThat(receive1.getPayload()).isEqualTo(1);
assertThat(receive2.getPayload()).isEqualTo(2);
assertThat(receive3.getPayload()).isEqualTo(3);
input0Binding.unbind();
input1Binding.unbind();
input2Binding.unbind();
input3Binding.unbind();
outputBinding.unbind();
}
@@ -1722,6 +1746,76 @@ public abstract class KafkaBinderTests extends
assertThat(extractEndpoint(producerBinding).isRunning()).isFalse();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testProducerErrorChannel() throws Exception {
AbstractKafkaTestBinder binder = getBinder();
DirectChannel moduleOutputChannel = createBindableChannel("output", new BindingProperties());
ExtendedProducerProperties<KafkaProducerProperties> producerProps = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
producerProps.setHeaderMode(HeaderMode.raw);
producerProps.setErrorChannelEnabled(true);
Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0", moduleOutputChannel, producerProps);
final Message<?> message = MessageBuilder.withPayload("bad").setHeader(MessageHeaders.CONTENT_TYPE, "foo/bar")
.build();
SubscribableChannel ec = binder.getApplicationContext().getBean("ec.0.errors", SubscribableChannel.class);
final AtomicReference<Message<?>> errorMessage = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(2);
ec.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
errorMessage.set(message);
latch.countDown();
}
});
SubscribableChannel globalEc = binder.getApplicationContext()
.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, SubscribableChannel.class);
globalEc.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
latch.countDown();
}
});
KafkaProducerMessageHandler endpoint = TestUtils.getPropertyValue(producerBinding, "lifecycle",
KafkaProducerMessageHandler.class);
final RuntimeException fooException = new RuntimeException("foo");
final AtomicReference<Object> sent = new AtomicReference<>();
new DirectFieldAccessor(endpoint).setPropertyValue("kafkaTemplate",
new KafkaTemplate(mock(ProducerFactory.class)) {
@Override // SIK < 2.3
public ListenableFuture<SendResult> send(String topic, Object payload) {
sent.set(payload);
SettableListenableFuture<SendResult> future = new SettableListenableFuture<>();
future.setException(fooException);
return future;
}
@Override // SIK 2.3+
public ListenableFuture<SendResult> send(ProducerRecord record) {
sent.set(record.value());
SettableListenableFuture<SendResult> future = new SettableListenableFuture<>();
future.setException(fooException);
return future;
}
});
moduleOutputChannel.send(message);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(errorMessage.get()).isInstanceOf(ErrorMessage.class);
assertThat(errorMessage.get().getPayload()).isInstanceOf(KafkaSendFailureException.class);
KafkaSendFailureException exception = (KafkaSendFailureException) errorMessage.get().getPayload();
assertThat(exception.getCause()).isSameAs(fooException);
assertThat(new String(((byte[] )exception.getFailedMessage().getPayload()))).isEqualTo(message.getPayload());
assertThat(exception.getRecord().value()).isSameAs(sent.get());
producerBinding.unbind();
}
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
Thread.sleep(500);

View File

@@ -4,6 +4,7 @@
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<logger name="org.apache.kafka" level="DEBUG"/>
<logger name="org.springframework.integration.kafka" level="INFO"/>
<logger name="org.springframework.kafka" level="INFO"/>
<logger name="org.springframework.cloud.stream" level="INFO" />

View File

@@ -2,21 +2,21 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
<artifactId>spring-cloud-stream-binder-kstream11</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-binder-kstream</name>
<description>Kafka Streams Binder Implementation</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -54,7 +54,7 @@ public class KStreamBinder extends
AbstractBinder<KStream<Object, Object>, ExtendedConsumerProperties<KStreamConsumerProperties>, ExtendedProducerProperties<KStreamProducerProperties>>
implements ExtendedPropertiesBinder<KStream<Object, Object>, KStreamConsumerProperties, KStreamProducerProperties> {
private String[] headers;
private final String[] headers;
private final KafkaTopicProvisioner kafkaTopicProvisioner;
@@ -62,8 +62,11 @@ public class KStreamBinder extends
private final StreamsConfig streamsConfig;
private final KafkaBinderConfigurationProperties binderConfigurationProperties;
public KStreamBinder(KafkaBinderConfigurationProperties binderConfigurationProperties, KafkaTopicProvisioner kafkaTopicProvisioner,
KStreamExtendedBindingProperties kStreamExtendedBindingProperties, StreamsConfig streamsConfig) {
KStreamExtendedBindingProperties kStreamExtendedBindingProperties, StreamsConfig streamsConfig) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.headers = EmbeddedHeaderUtils.headersToEmbed(binderConfigurationProperties.getHeaders());
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kStreamExtendedBindingProperties = kStreamExtendedBindingProperties;
@@ -72,7 +75,7 @@ public class KStreamBinder extends
@Override
protected Binding<KStream<Object, Object>> doBindConsumer(String name, String group,
KStream<Object, Object> inputTarget, ExtendedConsumerProperties<KStreamConsumerProperties> properties) {
KStream<Object, Object> inputTarget, ExtendedConsumerProperties<KStreamConsumerProperties> properties) {
ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<KafkaConsumerProperties>(
new KafkaConsumerProperties());
@@ -83,17 +86,17 @@ public class KStreamBinder extends
@Override
@SuppressWarnings("unchecked")
protected Binding<KStream<Object, Object>> doBindProducer(String name, KStream<Object, Object> outboundBindTarget,
ExtendedProducerProperties<KStreamProducerProperties> properties) {
ExtendedProducerProperties<KStreamProducerProperties> properties) {
ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<KafkaProducerProperties>(
new KafkaProducerProperties());
this.kafkaTopicProvisioner.provisionProducerDestination(name , extendedProducerProperties);
this.kafkaTopicProvisioner.provisionProducerDestination(name, extendedProducerProperties);
if (HeaderMode.embeddedHeaders.equals(properties.getHeaderMode())) {
outboundBindTarget = outboundBindTarget.map(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
@Override
public KeyValue<Object, Object> apply(Object k, Object v) {
if (v instanceof Message) {
try {
return new KeyValue<>(k, (Object)KStreamBinder.this.serializeAndEmbedHeadersIfApplicable((Message<?>) v));
return new KeyValue<>(k, KStreamBinder.this.serializeAndEmbedHeadersIfApplicable((Message<?>) v));
}
catch (Exception e) {
throw new IllegalArgumentException(e);
@@ -111,7 +114,7 @@ public class KStreamBinder extends
.map(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
@Override
public KeyValue<Object, Object> apply(Object k, Object v) {
return KeyValue.pair(k, (Object)KStreamBinder.this.serializePayloadIfNecessary((Message<?>) v));
return KeyValue.pair(k, (Object) KStreamBinder.this.serializePayloadIfNecessary((Message<?>) v));
}
});
}
@@ -120,37 +123,43 @@ public class KStreamBinder extends
.map(new KeyValueMapper<Object, Object, KeyValue<Object, Object>>() {
@Override
public KeyValue<Object, Object> apply(Object k, Object v) {
return KeyValue.pair(k, ((Message<?>) v).getPayload());
return KeyValue.pair(k, ((Message<Object>) v).getPayload());
}
});
}
}
if (!properties.isUseNativeEncoding() || StringUtils.hasText(properties.getExtension().getKeySerde()) || StringUtils.hasText(properties.getExtension().getValueSerde())) {
Serde<?> keySerde = Serdes.ByteArray();
Serde<?> valueSerde = Serdes.ByteArray();
try {
Serde<?> keySerde;
Serde<?> valueSerde;
if (StringUtils.hasText(properties.getExtension().getKeySerde())) {
keySerde = Utils.newInstance(properties.getExtension().getKeySerde(), Serde.class);
if (keySerde instanceof Configurable) {
((Configurable) keySerde).configure(streamsConfig.originals());
}
}
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("Serde class not found: ", e);
}
try {
else {
keySerde = this.binderConfigurationProperties.getConfiguration().containsKey("key.serde") ?
Utils.newInstance(this.binderConfigurationProperties.getConfiguration().get("key.serde"), Serde.class) : Serdes.ByteArray();
}
if (StringUtils.hasText(properties.getExtension().getValueSerde())) {
valueSerde = Utils.newInstance(properties.getExtension().getValueSerde(), Serde.class);
if (valueSerde instanceof Configurable) {
((Configurable) valueSerde).configure(streamsConfig.originals());
}
}
else {
valueSerde = this.binderConfigurationProperties.getConfiguration().containsKey("value.serde") ?
Utils.newInstance(this.binderConfigurationProperties.getConfiguration().get("value.serde"), Serde.class) : Serdes.ByteArray();
}
outboundBindTarget.to((Serde<Object>) keySerde, (Serde<Object>) valueSerde, name);
}
catch (ClassNotFoundException e) {
throw new IllegalStateException("Serde class not found: ", e);
}
outboundBindTarget.to((Serde<Object>) keySerde, (Serde<Object>) valueSerde, name);
}
else {
outboundBindTarget.to(name);
@@ -158,7 +167,7 @@ public class KStreamBinder extends
return new DefaultBinding<>(name, null, outboundBindTarget, null);
}
private byte[] serializeAndEmbedHeadersIfApplicable(Message<?> message) throws Exception {
private Object serializeAndEmbedHeadersIfApplicable(Message<?> message) throws Exception {
MessageValues transformed = serializePayloadIfNecessary(message);
byte[] payload;

Some files were not shown because too many files have changed in this diff Show More