Compare commits

..

33 Commits

Author SHA1 Message Date
Oleg Zhurakousky
4a5bc655e4 Merge pull request #610 from spring-operator/polish-urls-remaining-0.11
URL Cleanup
2019-03-26 14:21:53 +01:00
Oleg Zhurakousky
6029da2cb2 Merge pull request #600 from spring-operator/polish-urls-xml-0.11
URL Cleanup
2019-03-26 14:19:32 +01:00
Spring Operator
337596e334 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# HTTP URLs that Could Not Be Fixed
These URLs were unable to be fixed. Please review them to see if they can be manually resolved.

* [ ] http://xslthl.sf.net (301) with 4 occurrences could not be migrated:
   ([https](https://xslthl.sf.net) result AnnotatedConnectException).
* [ ] http://exslt.org/common (404) with 1 occurrences could not be migrated:
   ([https](https://exslt.org/common) result SSLHandshakeException).

# Fixed URLs

## Fixed But Review Recommended
These URLs were fixed, but the https status was not OK. However, the https status was the same as the http request or http redirected to an https URL, so they were migrated. Your review is recommended.

* [ ] http://0.0.0.0:8082 (AnnotatedConnectException) with 1 occurrences migrated to:
  https://0.0.0.0:8082 ([https](https://0.0.0.0:8082) result AnnotatedConnectException).
* [ ] http://compose.docker.io/ (UnknownHostException) with 1 occurrences migrated to:
  https://compose.docker.io/ ([https](https://compose.docker.io/) result UnknownHostException).
* [ ] http://docs.spring.io/spring-cloud-stream-binder-kafka/docs/ (301) with 1 occurrences migrated to:
  https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/ ([https](https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/) result 404).
* [ ] http://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current-SNAPSHOT/reference/html/ (301) with 1 occurrences migrated to:
  https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current-SNAPSHOT/reference/html/ ([https](https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current-SNAPSHOT/reference/html/) result 404).
* [ ] http://docs.spring.io/spring-kafka/reference/html/_reference.html (301) with 1 occurrences migrated to:
  https://docs.spring.io/spring-kafka/reference/html/_reference.html ([https](https://docs.spring.io/spring-kafka/reference/html/_reference.html) result 404).

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://docs.confluent.io/2.0.0/kafka/security.html with 1 occurrences migrated to:
  https://docs.confluent.io/2.0.0/kafka/security.html ([https](https://docs.confluent.io/2.0.0/kafka/security.html) result 200).
* [ ] http://github.com/ with 3 occurrences migrated to:
  https://github.com/ ([https](https://github.com/) result 200).
* [ ] http://kafka.apache.org/090/documentation.html with 2 occurrences migrated to:
  https://kafka.apache.org/090/documentation.html ([https](https://kafka.apache.org/090/documentation.html) result 200).
* [ ] http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html with 1 occurrences migrated to:
  https://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html ([https](https://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html) result 200).
* [ ] http://plugins.jetbrains.com/plugin/6546 with 1 occurrences migrated to:
  https://plugins.jetbrains.com/plugin/6546 ([https](https://plugins.jetbrains.com/plugin/6546) result 301).
* [ ] http://raw.github.com/ with 1 occurrences migrated to:
  https://raw.github.com/ ([https](https://raw.github.com/) result 301).
* [ ] http://eclipse.org with 1 occurrences migrated to:
  https://eclipse.org ([https](https://eclipse.org) result 302).
* [ ] http://eclipse.org/m2e/ with 2 occurrences migrated to:
  https://eclipse.org/m2e/ ([https](https://eclipse.org/m2e/) result 302).
* [ ] http://www.springsource.com/developer/sts with 1 occurrences migrated to:
  https://www.springsource.com/developer/sts ([https](https://www.springsource.com/developer/sts) result 302).

# Ignored
These URLs were intentionally ignored.

* http://docbook.org/ns/docbook with 4 occurrences
* http://docbook.sourceforge.net/xmlns/l10n/1.0 with 2 occurrences
* http://localhost:8082 with 2 occurrences
* http://maven.apache.org/POM/4.0.0 with 1 occurrences
* http://www.w3.org/1999/XSL/Format with 2 occurrences
* http://www.w3.org/1999/XSL/Transform with 7 occurrences
* http://www.w3.org/1999/xlink with 1 occurrences
2019-03-26 03:57:12 -05:00
Spring Operator
6614ab90d3 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# HTTP URLs that Could Not Be Fixed
These URLs were unable to be fixed. Please review them to see if they can be manually resolved.

* [ ] http://xslthl.sf.net (301) with 1 occurrences could not be migrated:
   ([https](https://xslthl.sf.net) result AnnotatedConnectException).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://asciidoctor.org with 1 occurrences migrated to:
  https://asciidoctor.org ([https](https://asciidoctor.org) result 200).
* [ ] http://sourceforge.net/projects/xslthl/ with 14 occurrences migrated to:
  https://sourceforge.net/projects/xslthl/ ([https](https://sourceforge.net/projects/xslthl/) result 200).
* [ ] http://www.w3.org/TR/CSS21/propidx.html with 1 occurrences migrated to:
  https://www.w3.org/TR/CSS21/propidx.html ([https](https://www.w3.org/TR/CSS21/propidx.html) result 200).
* [ ] http://repo.spring.io/libs-milestone-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-milestone-local ([https](https://repo.spring.io/libs-milestone-local) result 302).
* [ ] http://repo.spring.io/libs-snapshot-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-snapshot-local ([https](https://repo.spring.io/libs-snapshot-local) result 302).
* [ ] http://repo.spring.io/release with 1 occurrences migrated to:
  https://repo.spring.io/release ([https](https://repo.spring.io/release) result 302).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 14 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 7 occurrences
2019-03-26 00:22:57 -05:00
Oleg Zhurakousky
fa6a476996 Merge pull request #575 from spring-operator/polish-urls-apache-license-0.11
URL Cleanup
2019-03-25 15:17:26 +01:00
Spring Operator
dce3e83520 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* [ ] http://www.apache.org/licenses/ with 1 occurrences migrated to:
  https://www.apache.org/licenses/ ([https](https://www.apache.org/licenses/) result 200).
* [ ] http://www.apache.org/licenses/LICENSE-2.0 with 55 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
2019-03-21 13:22:58 -05:00
Spring Operator
117c628f88 URL Cleanup
This commit updates URLs to prefer the https protocol. Redirects are not followed to avoid accidentally expanding intentionally shortened URLs (i.e. if using a URL shortener).

# Fixed URLs

## Fixed But Review Recommended
These URLs were fixed, but the https status was not OK. However, the https status was the same as the http request or http redirected to an https URL, so they were migrated. Your review is recommended.

* http://packages.confluent.io/maven/ (404) with 1 occurrences migrated to:
  https://packages.confluent.io/maven/ ([https](https://packages.confluent.io/maven/) result 404).

## Fixed Success
These URLs were switched to an https URL with a 2xx status. While the status was successful, your review is still recommended.

* http://docs.spring.io/spring-framework/docs/ with 1 occurrences migrated to:
  https://docs.spring.io/spring-framework/docs/ ([https](https://docs.spring.io/spring-framework/docs/) result 200).
* http://docs.spring.io/spring-shell/docs/current/api/ with 1 occurrences migrated to:
  https://docs.spring.io/spring-shell/docs/current/api/ ([https](https://docs.spring.io/spring-shell/docs/current/api/) result 200).
* http://maven.apache.org/xsd/maven-4.0.0.xsd with 7 occurrences migrated to:
  https://maven.apache.org/xsd/maven-4.0.0.xsd ([https](https://maven.apache.org/xsd/maven-4.0.0.xsd) result 200).
* http://www.apache.org/licenses/LICENSE-2.0 with 2 occurrences migrated to:
  https://www.apache.org/licenses/LICENSE-2.0 ([https](https://www.apache.org/licenses/LICENSE-2.0) result 200).
* http://projects.spring.io/spring-cloud with 3 occurrences migrated to:
  https://projects.spring.io/spring-cloud ([https](https://projects.spring.io/spring-cloud) result 301).
* http://www.spring.io with 3 occurrences migrated to:
  https://www.spring.io ([https](https://www.spring.io) result 301).
* http://repo.spring.io/libs-milestone-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-milestone-local ([https](https://repo.spring.io/libs-milestone-local) result 302).
* http://repo.spring.io/libs-release-local with 1 occurrences migrated to:
  https://repo.spring.io/libs-release-local ([https](https://repo.spring.io/libs-release-local) result 302).
* http://repo.spring.io/libs-snapshot-local with 2 occurrences migrated to:
  https://repo.spring.io/libs-snapshot-local ([https](https://repo.spring.io/libs-snapshot-local) result 302).
* http://repo.spring.io/release with 1 occurrences migrated to:
  https://repo.spring.io/release ([https](https://repo.spring.io/release) result 302).

# Ignored
These URLs were intentionally ignored.

* http://maven.apache.org/POM/4.0.0 with 14 occurrences
* http://www.w3.org/2001/XMLSchema-instance with 7 occurrences
2019-03-20 09:50:48 -04:00
Gary Russell
2a3300d066 Fix default partition header expression 2018-08-02 13:23:37 -04:00
Soby Chacko
68fad4a958 KafkaBinderConfigurationProperties duplicate bean
ConfigurationProperties bean provided by Kafka Streams binder
extends from `KafkaBinderConfigurationProperties` used by Kafka binder.
It creates a conflict when autowiring this bean from Kafka binder configuration.
This prevents an application to have both binders in the classpath.
Change the creation of this ConfigurationProperties bean so that it
avoids creating bean using EnbaleConfigurationProperties and then autowiring,
but directly create the Bean using `@Bean`. This prevents the conflict.

Resolves #244
Resolves #315
2018-08-02 13:23:37 -04:00
Aldo Sinanaj
11e82813e3 Enhance DLQ messages with failure info 2018-08-02 13:23:37 -04:00
Gary Russell
b52b398dab SCST-GH-1166: Support Producer-initiated tx
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/276

Previously, transactions were only supported if initiated by a consumer.
2018-01-17 15:58:20 -05:00
Gary Russell
a79bd5f15f GH-239: Fix dependency for the kafka11 starter
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/239
2017-11-15 16:13:20 -05:00
Gary Russell
f9c62af169 GH-251: Configurable inbound adapter converter
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/251

When using native decoding, the message emitted by the binder has no `id` or `timestamp`
headers. This can cause problems if the first SI endpoint uses a JDBC message store,
for example.

Add the ability to configure whether these headers are populated as well as the ability
to inject a custom message converter.

Polishing, fixing conflicts after cherry-picking.
2017-11-15 15:59:51 -05:00
Gary Russell
4b9ab0448e GH-223: Deserialization with native encoding
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/223

Add test case; stream fix: https://github.com/spring-cloud/spring-cloud-stream/pull/1095

* Some minor test code polishing
2017-10-10 15:03:15 -04:00
Soby Chacko
b8c1ca3533 Update next version: 1.3.1.BUILD-SNAPSHOT 2017-10-04 12:11:57 -04:00
Artem Bilan
5269a6642c Prepare for release
* Upgrade to SCSt-1.3.0.RELEASE
* Revert to SC-build-1.3.5.RELEASE
2017-10-02 15:31:00 -04:00
Gary Russell
a6201425e0 GH-215: Add timeout to health indicator
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/215

* Shutdown the executor.

* Polishing - PR Comments

* Re-interrupt thread.

* More Polishing

# Conflicts:
#	spring-cloud-stream-binder-kafka11-docs/src/main/asciidoc/overview.adoc
2017-10-02 14:07:09 -04:00
Gary Russell
715b69506d Update to SK 1.3, SIK 2.3 RELEASE
Fixes spring-cloud/spring-cloud-stream-binder-kafka#220
2017-10-02 14:04:45 -04:00
Artem Bilan
4fb37e2eac Cast ProducerFactory to DisposableBean for destroy
https://jenkins.spring.io/blue/organizations/jenkins/spring-cloud-stream-binder-kafka-0.11-ci/detail/spring-cloud-stream-binder-kafka-0.11-ci/7/pipeline
2017-09-28 12:36:32 -04:00
Gary Russell
2a6d716ecf GH-206: Close Consumer/Producer in provisioning
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/206

Close the consumer and producer after retrieving the current partition count.

**Cherry pick/back port to 0.11 and 1.2.x, 2.0.x**

* Destroy the Producer Factory

# Conflicts:
#	spring-cloud-stream-binder-kafka11/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java
2017-09-28 11:04:01 -04:00
Soby Chacko
7344bed048 Accepting custom trusted packages
Kafka header property to accept trusted packages

Fix #195
2017-09-27 20:49:39 +01:00
Gary Russell
1e74d624c8 Back to 1.3.0.BUILD-SNAPSHOT 2017-09-13 13:52:39 -04:00
Gary Russell
7a2b9f1c56 Update POMs to 1.3.0.RC1; build 1.3.5.RELEASE 2017-09-13 12:37:17 -04:00
Soby Chacko
d72a4fdb33 Kstream binder: producer default Serde changes
Change the way the default Serde classes are selected for key and value
in producer when only one of those is provided by the user.

Fix #190
2017-09-13 12:28:05 -04:00
Soby Chacko
ea8b3062f1 GH-188: KStream Binder Properties
KStream binder: support class for application level properties

Provide commonly used KStream application properties for convenient access at runtime

Fix #188

Since windowing operations are common in KStream applications, making the TimeWindows object
avaiable as a first class bean (using auto configuration). This bean is only created if the
relevant properties are provided by the user.
2017-09-13 12:27:46 -04:00
Gary Russell
3b3896d642 Version Updates (SK, SIK, SI) 2017-09-11 17:25:54 -04:00
Gary Russell
3d244b156e Change Artifacts to ...kafka11 2017-08-29 18:32:28 -04:00
Gary Russell
e968194b1a KStreams and 0.11 2017-08-29 17:12:11 -04:00
Gary Russell
add9a5d1dc 0.11 Binder
Initial Commit

- Transactional Binder

Version Updates

- Headers support
2017-08-29 14:59:39 -04:00
Gary Russell
02913cd177 GH-169: Use the Actual Partition Count (Producer)
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/169

If the configured `partitionCount` is less than the physical partition count on an existing
topic, the binder emits this message:

    The `partitionCount` of the producer for topic partJ.0 is 3, smaller than the actual partition count of 8 of the topic.
    The larger number will be used instead.

However, that is not true; the configured partition count is used.

Override the configured partition count with the actual partition count.
2017-08-22 13:34:21 -04:00
Gary Russell
0865602141 GH-62: Remove Tuple Kryo Registrar Wrapper
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/62

No longer needed.
2017-08-22 13:24:51 -04:00
Gary Russell
790b141799 Fixes #181
SCSt-GH-916: Configure Producer Error Channel

Requires: https://github.com/spring-cloud/spring-cloud-stream/pull/1039

Publish send failures to the error channel.

Add docs

Revert to Spring Kafka 1.1.6
2017-08-22 12:56:16 -04:00
Vinicius Carvalho
60e620e36e Set version to 1.3.0.BUILD-SNAPSHOT 2017-07-31 16:00:00 -04:00
125 changed files with 1272 additions and 1027 deletions

View File

@@ -21,7 +21,7 @@
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>http://repo.spring.io/libs-snapshot-local</url>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
@@ -29,7 +29,7 @@
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone-local</url>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
@@ -37,7 +37,7 @@
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>http://repo.spring.io/release</url>
<url>https://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
@@ -47,7 +47,7 @@
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>http://repo.spring.io/libs-snapshot-local</url>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
@@ -55,7 +55,7 @@
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone-local</url>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>

View File

@@ -1,6 +1,6 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
https://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
@@ -192,7 +192,7 @@
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,

2
mvnw vendored
View File

@@ -8,7 +8,7 @@
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an

2
mvnw.cmd vendored
View File

@@ -7,7 +7,7 @@
@REM "License"); you may not use this file except in compliance
@REM with the License. You may obtain a copy of the License at
@REM
@REM http://www.apache.org/licenses/LICENSE-2.0
@REM https://www.apache.org/licenses/LICENSE-2.0
@REM
@REM Unless required by applicable law or agreed to in writing,
@REM software distributed under the License is distributed on an

53
pom.xml
View File

@@ -1,43 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.3.3.RELEASE</version>
<version>1.3.5.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.7</java.version>
<kafka.version>0.10.1.1</kafka.version>
<spring-kafka.version>1.1.6.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.1.1.RELEASE</spring-integration-kafka.version>
<spring-cloud-stream.version>1.3.0.M2</spring-cloud-stream.version>
<spring-cloud-build.version>1.3.3.RELEASE</spring-cloud-build.version>
<kafka.version>0.11.0.0</kafka.version>
<spring-kafka.version>1.3.2.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.3.0.RELEASE</spring-integration-kafka.version>
<spring-cloud-stream.version>1.3.1.BUILD-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-build.version>1.3.5.RELEASE</spring-cloud-build.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>
<module>spring-cloud-starter-stream-kafka</module>
<module>spring-cloud-stream-binder-kafka-docs</module>
<module>spring-cloud-stream-binder-kafka-0.10.1-test</module>
<module>spring-cloud-stream-binder-kafka-0.10.2-test</module>
<module>spring-cloud-stream-binder-kafka-core</module>
<module>spring-cloud-stream-binder-kstream</module>
<module>spring-cloud-stream-binder-kafka11</module>
<module>spring-cloud-starter-stream-kafka11</module>
<module>spring-cloud-stream-binder-kafka11-docs</module>
<module>spring-cloud-stream-binder-kafka-0.11-test</module>
<module>spring-cloud-stream-binder-kafka11-core</module>
<module>spring-cloud-stream-binder-kstream11</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -83,6 +82,12 @@
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>${spring-integration-kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
@@ -190,7 +195,7 @@
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>http://repo.spring.io/libs-snapshot-local</url>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
@@ -201,7 +206,7 @@
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone-local</url>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
@@ -209,7 +214,7 @@
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>http://repo.spring.io/release</url>
<url>https://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
@@ -219,7 +224,7 @@
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>http://repo.spring.io/libs-snapshot-local</url>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
@@ -230,7 +235,7 @@
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone-local</url>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
@@ -238,7 +243,7 @@
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>http://repo.spring.io/libs-release-local</url>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>

View File

@@ -1,17 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>
<url>http://projects.spring.io/spring-cloud</url>
<artifactId>spring-cloud-starter-stream-kafka11</artifactId>
<description>Spring Cloud Starter Stream Kafka for the 0.11.x.x client</description>
<url>https://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>http://www.spring.io</url>
<url>https://www.spring.io</url>
</organization>
<properties>
<main.basedir>${basedir}/../..</main.basedir>
@@ -19,7 +19,7 @@
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

View File

@@ -1,126 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.1-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.1 Tests</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>http://www.spring.io</url>
</organization>
<properties>
<main.basedir>${basedir}/../..</main.basedir>
<kafka.version>0.10.1.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-0.10.2-test</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>${spring-cloud-stream.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -1,241 +0,0 @@
/*
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.assertj.core.api.Assertions;
import org.eclipse.jetty.server.Server;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.Spy;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import static org.junit.Assert.assertTrue;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
* This test specifically tests for the 0.10.1.x version of Kafka.
*
* @author Eric Bottard
* @author Marius Bogoevici
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
*/
public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests {
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
private Kafka10TestBinder binder;
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
Thread.sleep(500);
}
@Override
protected Kafka10TestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binder = new Kafka10TestBinder(binderConfiguration);
}
return binder;
}
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
List<String> bAddresses = new ArrayList<>();
for (BrokerAddress bAddress : brokerAddresses) {
bAddresses.add(bAddress.toString());
}
String[] foo = new String[bAddresses.size()];
binderConfiguration.setBrokers(bAddresses.toArray(foo));
binderConfiguration.setZkNodes(embeddedKafka.getZookeeperConnectionString());
return binderConfiguration;
}
@Override
protected int partitionSize(String topic) {
return consumerFactory().createConsumer().partitionsFor(topic).size();
}
@Override
protected ZkUtils getZkUtils(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
final ZkClient zkClient = new ZkClient(kafkaBinderConfigurationProperties.getZkConnectionString(),
kafkaBinderConfigurationProperties.getZkSessionTimeout(), kafkaBinderConfigurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
return new ZkUtils(zkClient, null, false);
}
@Override
protected void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Properties topicConfig) {
adminUtilsOperation.invokeCreateTopic(zkUtils, topic, partitions, replicationFactor, new Properties());
}
@Override
protected int invokePartitionSize(String topic, ZkUtils zkUtils) {
return adminUtilsOperation.partitionSize(topic, zkUtils);
}
@Override
public String getKafkaOffsetHeaderKey() {
return KafkaHeaders.OFFSET;
}
@Override
protected Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
return new Kafka10TestBinder(kafkaBinderConfigurationProperties);
}
@Before
public void init() {
String multiplier = System.getenv("KAFKA_TIMEOUT_MULTIPLIER");
if (multiplier != null) {
timeoutMultiplier = Double.parseDouble(multiplier);
}
}
@Override
protected boolean usesExplicitRouting() {
return false;
}
@Override
protected String getClassUnderTestName() {
return CLASS_UNDER_TEST_NAME;
}
@Override
public Spy spyOn(final String name) {
throw new UnsupportedOperationException("'spyOn' is not used by Kafka tests");
}
private ConsumerFactory<byte[], byte[]> consumerFactory() {
Map<String, Object> props = new HashMap<>();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "TEST-CONSUMER-GROUP");
Deserializer<byte[]> valueDecoder = new ByteArrayDeserializer();
Deserializer<byte[]> keyDecoder = new ByteArrayDeserializer();
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
}
@Test
@SuppressWarnings("unchecked")
public void testCustomAvroSerialization() throws Exception {
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
Map<String, Object> schemaRegistryProps = new HashMap<>();
schemaRegistryProps.put("kafkastore.connection.url", configurationProperties.getZkConnectionString());
schemaRegistryProps.put("listeners", "http://0.0.0.0:8082");
schemaRegistryProps.put("port", "8082");
schemaRegistryProps.put("kafkastore.topic", "_schemas");
SchemaRegistryConfig config = new SchemaRegistryConfig(schemaRegistryProps);
SchemaRegistryRestApplication app = new SchemaRegistryRestApplication(config);
Server server = app.createServer();
server.start();
long endTime = System.currentTimeMillis() + 5000;
while(true) {
if (server.isRunning()) {
break;
}
else if (System.currentTimeMillis() > endTime) {
Assertions.fail("Kafka Schema Registry Server failed to start");
}
}
User1 firstOutboundFoo = new User1();
String userName1 = "foo-name" + UUID.randomUUID().toString();
String favColor1 = "foo-color" + UUID.randomUUID().toString();
firstOutboundFoo.setName(userName1);
firstOutboundFoo.setFavoriteColor(favColor1);
Message<?> message = MessageBuilder.withPayload(firstOutboundFoo).build();
SubscribableChannel moduleOutputChannel = new DirectChannel();
String testTopicName = "existing" + System.currentTimeMillis();
invokeCreateTopic(zkUtils, testTopicName, 6, 1, new Properties());
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.getExtension().getConfiguration().put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
producerProperties.setUseNativeEncoding(true);
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
consumerProperties.getExtension().getConfiguration().put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
consumerProperties.getExtension().getConfiguration().put("schema.registry.url", "http://localhost:8082");
Binding<MessageChannel> consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
Assertions.assertThat(inbound).isNotNull();
assertTrue(message.getPayload() instanceof User1);
User1 receivedUser = (User1) message.getPayload();
Assertions.assertThat(receivedUser.getName()).isEqualTo(userName1);
Assertions.assertThat(receivedUser.getFavoriteColor()).isEqualTo(favColor1);
producerBinding.unbind();
consumerBinding.unbind();
}
}

View File

@@ -1,34 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10.2-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10.2 Tests</description>
<url>http://projects.spring.io/spring-cloud</url>
<artifactId>spring-cloud-stream-binder-kafka-0.11-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.11 Tests</description>
<url>https://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>http://www.spring.io</url>
<url>https://www.spring.io</url>
</organization>
<properties>
<main.basedir>${basedir}/../..</main.basedir>
<kafka.version>0.10.2.1</kafka.version>
<spring-kafka.version>1.2.2.RELEASE</spring-kafka.version>
<kafka.version>0.11.0.0</kafka.version>
<spring-kafka.version>1.3.0.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<version>${project.version}</version>
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${project.version}</version>
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -63,7 +61,7 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
@@ -96,7 +94,7 @@
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -67,6 +67,7 @@ public class Kafka10TestBinder extends AbstractKafkaTestBinder {
ProducerListener producerListener = new LoggingProducerListener();
binder.setProducerListener(producerListener);
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
setApplicationContext(context);
binder.setApplicationContext(context);
binder.afterPropertiesSet();
this.setBinder(binder);

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2014-2016 the original author or authors.
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,8 +25,10 @@ import java.util.UUID;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -38,6 +40,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
@@ -46,6 +49,7 @@ import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOper
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.core.ConsumerFactory;
@@ -55,22 +59,26 @@ import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import static org.junit.Assert.assertTrue;
/**
* Integration tests for the {@link KafkaMessageChannelBinder}.
*
* This test specifically tests for the 0.10.2.x version of Kafka.
* This test specifically tests for the 0.11.x.x version of Kafka.
*
* @author Eric Bottard
* @author Marius Bogoevici
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
* @author Gary Russell
*/
public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
public class Kafka_0_11_BinderTests extends KafkaBinderTests {
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
@@ -79,7 +87,7 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
private Kafka10TestBinder binder;
private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
private final Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation();
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
@@ -90,11 +98,13 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
protected Kafka10TestBinder getBinder() {
if (binder == null) {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setHeaders("dlqTestHeader");
binder = new Kafka10TestBinder(binderConfiguration);
}
return binder;
}
@Override
protected KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
@@ -178,6 +188,42 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
return new DefaultKafkaConsumerFactory<>(props, keyDecoder, valueDecoder);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testTrustedPackages() throws Exception {
Binder binder = getBinder();
BindingProperties producerBindingProperties = createProducerBindingProperties(createProducerProperties());
DirectChannel moduleOutputChannel = createBindableChannel("output", producerBindingProperties);
QueueChannel moduleInputChannel = new QueueChannel();
Binding<MessageChannel> producerBinding = binder.bindProducer("bar.0", moduleOutputChannel,
producerBindingProperties.getProducer());
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setTrustedPackages(new String[]{"org.springframework.util"});
Binding<MessageChannel> consumerBinding = binder.bindConsumer("bar.0",
"testSendAndReceiveNoOriginalContentType", moduleInputChannel, consumerProperties);
binderBindUnbindLatency();
Message<?> message = org.springframework.integration.support.MessageBuilder.withPayload("foo")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).setHeader("foo", MimeTypeUtils.TEXT_PLAIN)
.build();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel);
Assertions.assertThat(inbound).isNotNull();
Assertions.assertThat(inbound.getPayload()).isEqualTo("foo");
Assertions.assertThat(inbound.getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull();
Assertions.assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.TEXT_PLAIN_VALUE);
Assertions.assertThat(inbound.getHeaders().get("foo")).isInstanceOf(MimeType.class);
MimeType actual = (MimeType) inbound.getHeaders().get("foo");
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN);
producerBinding.unbind();
consumerBinding.unbind();
}
class Foo{}
@Test
@SuppressWarnings("unchecked")
public void testCustomAvroSerialization() throws Exception {
@@ -188,7 +234,7 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
Map<String, Object> schemaRegistryProps = new HashMap<>();
schemaRegistryProps.put("kafkastore.connection.url", configurationProperties.getZkConnectionString());
schemaRegistryProps.put("listeners", "http://0.0.0.0:8082");
schemaRegistryProps.put("listeners", "https://0.0.0.0:8082");
schemaRegistryProps.put("port", "8082");
schemaRegistryProps.put("kafkastore.topic", "_schemas");
SchemaRegistryConfig config = new SchemaRegistryConfig(schemaRegistryProps);
@@ -238,4 +284,15 @@ public class Kafka_0_10_2_BinderTests extends KafkaBinderTests {
producerBinding.unbind();
consumerBinding.unbind();
}
@Override
public void testSendAndReceiveWithExplicitConsumerGroupWithRawMode() {
// raw mode no longer needed
}
@Override
public void testSendAndReceiveWithRawModeAndStringPayload() {
// raw mode no longer needed
}
}

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -1,74 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.kafka.core.ConsumerFactory;
/**
* Health indicator for Kafka.
*
* @author Ilayaperumal Gopinathan
* @author Marius Bogoevici
* @author Henryk Konsek
*/
public class KafkaBinderHealthIndicator implements HealthIndicator {
private final KafkaMessageChannelBinder binder;
private final ConsumerFactory<?, ?> consumerFactory;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
ConsumerFactory<?, ?> consumerFactory) {
this.binder = binder;
this.consumerFactory = consumerFactory;
}
@Override
public Health health() {
try (Consumer<?, ?> metadataConsumer = consumerFactory.createConsumer()) {
Set<String> downMessages = new HashSet<>();
for (String topic : this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (this.binder.getTopicsInUse().get(topic).getPartitionInfos().contains(partitionInfo)
&& partitionInfo.leader()
.id() == -1) {
downMessages.add(partitionInfo.toString());
}
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
return Health.down().withDetail("Following partitions in use have no leaders: ", downMessages.toString())
.build();
}
catch (Exception e) {
return Health.down(e).build();
}
}
}

View File

@@ -1,118 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import org.apache.kafka.common.security.JaasUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.util.Assert;
/**
* @author Marius Bogoevici
*/
public class KafkaBinderJaasInitializerListener implements ApplicationListener<ContextRefreshedEvent>,
ApplicationContextAware, DisposableBean {
public static final String DEFAULT_ZK_LOGIN_CONTEXT_NAME = "Client";
private ApplicationContext applicationContext;
private final boolean ignoreJavaLoginConfigParamSystemProperty;
private final File placeholderJaasConfiguration;
public KafkaBinderJaasInitializerListener() throws IOException {
// we ignore the system property if it wasn't originally set at launch
this.ignoreJavaLoginConfigParamSystemProperty =
(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) == null);
this.placeholderJaasConfiguration = File.createTempFile("kafka-client-jaas-config-placeholder", "conf");
this.placeholderJaasConfiguration.deleteOnExit();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void destroy() throws Exception {
if (this.ignoreJavaLoginConfigParamSystemProperty) {
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
}
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getSource() == this.applicationContext) {
KafkaBinderConfigurationProperties binderConfigurationProperties =
applicationContext.getBean(KafkaBinderConfigurationProperties.class);
// only use programmatic support if a file is not set via system property
if (ignoreJavaLoginConfigParamSystemProperty
&& binderConfigurationProperties.getJaas() != null) {
Map<String, AppConfigurationEntry[]> configurationEntries = new HashMap<>();
AppConfigurationEntry kafkaClientConfigurationEntry = new AppConfigurationEntry
(binderConfigurationProperties.getJaas().getLoginModule(),
binderConfigurationProperties.getJaas().getControlFlagValue(),
binderConfigurationProperties.getJaas().getOptions() != null ?
binderConfigurationProperties.getJaas().getOptions() :
Collections.<String, Object>emptyMap());
configurationEntries.put(JaasUtils.LOGIN_CONTEXT_CLIENT,
new AppConfigurationEntry[]{ kafkaClientConfigurationEntry });
Configuration.setConfiguration(new InternalConfiguration(configurationEntries));
// Workaround for a 0.9 client issue where even if the Configuration is set
// a system property check is performed.
// Since the Configuration already exists, this will be ignored.
if (this.placeholderJaasConfiguration != null) {
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, this.placeholderJaasConfiguration.getAbsolutePath());
}
}
}
}
/**
* A {@link Configuration} set up programmatically by the Kafka binder
*/
public static class InternalConfiguration extends Configuration {
private final Map<String, AppConfigurationEntry[]> configurationEntries;
public InternalConfiguration(Map<String, AppConfigurationEntry[]> configurationEntries) {
Assert.notNull(configurationEntries, " cannot be null");
Assert.notEmpty(configurationEntries, " cannot be empty");
this.configurationEntries = configurationEntries;
}
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
return configurationEntries.get(name);
}
}
}

View File

@@ -1,61 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import javax.security.auth.login.AppConfigurationEntry;
import com.sun.security.auth.login.ConfigFile;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.io.ClassPathResource;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Marius Bogoevici
*/
public class KafkaBinderJaasInitializerListenerTest {
@Test
public void testConfigurationParsedCorrectlyWithKafkaClient() throws Exception {
ConfigFile configFile = new ConfigFile(new ClassPathResource("jaas-sample-kafka-only.conf").getURI());
final AppConfigurationEntry[] kafkaConfigurationArray = configFile.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_CLIENT);
final ConfigurableApplicationContext context =
SpringApplication.run(SimpleApplication.class,
"--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true",
"--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true",
"--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab",
"--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM");
javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration.getConfiguration();
final AppConfigurationEntry[] kafkaConfiguration = configuration.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_CLIENT);
assertThat(kafkaConfiguration).hasSize(1);
assertThat(kafkaConfiguration[0].getOptions()).isEqualTo(kafkaConfigurationArray[0].getOptions());
context.close();
}
@SpringBootApplication
public static class SimpleApplication {
}
}

View File

@@ -0,0 +1,97 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.Collections;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.ClassRule;
import org.junit.Test;
import org.mockito.InOrder;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.retry.support.RetryTemplate;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
/**
* @author Gary Russell
* @since 2.0
*
*/
public class KafkaTransactionTests {
@ClassRule
public static final KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testProducerRunsInTx() {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.setBootstrapServers(Collections.singletonList(embeddedKafka.getBrokersAsString()));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties();
configurationProperties.getTransaction().setTransactionIdPrefix("foo-");
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(configurationProperties, kafkaProperties);
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
final Producer mockProducer = mock(Producer.class);
willReturn(Collections.singletonList(new TopicPartition("foo", 0))).given(mockProducer).partitionsFor(anyString());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties, provisioningProvider) {
@Override
protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(String transactionIdPrefix,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory =
spy(super.getProducerFactory(transactionIdPrefix, producerProperties));
willReturn(mockProducer).given(producerFactory).createProducer();
return producerFactory;
}
};
GenericApplicationContext applicationContext = new GenericApplicationContext();
applicationContext.refresh();
binder.setApplicationContext(applicationContext);
DirectChannel channel = new DirectChannel();
KafkaProducerProperties extension = new KafkaProducerProperties();
ExtendedProducerProperties<KafkaProducerProperties> properties = new ExtendedProducerProperties<>(extension);
binder.bindProducer("foo", channel, properties);
channel.send(new GenericMessage<>("foo".getBytes()));
InOrder inOrder = inOrder(mockProducer);
inOrder.verify(mockProducer).beginTransaction();
inOrder.verify(mockProducer).send(any(ProducerRecord.class), any(Callback.class));
inOrder.verify(mockProducer).commitTransaction();
inOrder.verify(mockProducer).close();
inOrder.verifyNoMoreInteractions();
}
}

View File

@@ -1,17 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>
<url>http://projects.spring.io/spring-cloud</url>
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core for the 0.11.x.x client</description>
<url>https://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
<url>http://www.spring.io</url>
<url>https://www.spring.io</url>
</organization>
<properties>
@@ -33,13 +33,6 @@
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>${spring-integration-kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
@@ -35,10 +36,13 @@ import org.springframework.util.StringUtils;
* @author Ilayaperumal Gopinathan
* @author Marius Bogoevici
* @author Soby Chacko
* @author Gary Russell
*/
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder")
public class KafkaBinderConfigurationProperties {
private final Transaction transaction = new Transaction();
@Autowired(required = false)
private KafkaProperties kafkaProperties;
@@ -88,8 +92,22 @@ public class KafkaBinderConfigurationProperties {
private int queueSize = 8192;
/**
* Time to wait to get partition information in seconds; default 60.
*/
private int healthTimeout = 60;
private JaasLoginModuleConfiguration jaas;
/**
* The bean name of a custom header mapper to use instead of a {@link DefaultKafkaHeaderMapper}.
*/
private String headerMapperBeanName;
public Transaction getTransaction() {
return this.transaction;
}
public String getZkConnectionString() {
return toConnectionString(this.zkNodes, this.defaultZkPort);
}
@@ -228,6 +246,14 @@ public class KafkaBinderConfigurationProperties {
this.minPartitionCount = minPartitionCount;
}
public int getHealthTimeout() {
return this.healthTimeout;
}
public void setHealthTimeout(int healthTimeout) {
this.healthTimeout = healthTimeout;
}
public int getQueueSize() {
return this.queueSize;
}
@@ -338,4 +364,32 @@ public class KafkaBinderConfigurationProperties {
this.jaas = jaas;
}
public String getHeaderMapperBeanName() {
return this.headerMapperBeanName;
}
public void setHeaderMapperBeanName(String headerMapperBeanName) {
this.headerMapperBeanName = headerMapperBeanName;
}
public static class Transaction {
private final KafkaProducerProperties producer = new KafkaProducerProperties();
private String transactionIdPrefix;
public String getTransactionIdPrefix() {
return this.transactionIdPrefix;
}
public void setTransactionIdPrefix(String transactionIdPrefix) {
this.transactionIdPrefix = transactionIdPrefix;
}
public KafkaProducerProperties getProducer() {
return this.producer;
}
}
}

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,6 +22,8 @@ import java.util.Map;
/**
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
* @author Soby Chacko
* @author Gary Russell
*
* <p>
* Thanks to Laszlo Szabo for providing the initial patch for generic property support.
@@ -29,6 +31,28 @@ import java.util.Map;
*/
public class KafkaConsumerProperties {
public enum StartOffset {
earliest(-2L),
latest(-1L);
private final long referencePoint;
StartOffset(long referencePoint) {
this.referencePoint = referencePoint;
}
public long getReferencePoint() {
return this.referencePoint;
}
}
public enum StandardHeaders {
none,
id,
timestamp,
both
}
private boolean autoRebalanceEnabled = true;
private boolean autoCommitOffset = true;
@@ -43,6 +67,12 @@ public class KafkaConsumerProperties {
private int recoveryInterval = 5000;
private String[] trustedPackages;
private StandardHeaders standardHeaders = StandardHeaders.none;
private String converterBeanName;
private Map<String, String> configuration = new HashMap<>();
public boolean isAutoCommitOffset() {
@@ -93,21 +123,6 @@ public class KafkaConsumerProperties {
this.autoRebalanceEnabled = autoRebalanceEnabled;
}
public enum StartOffset {
earliest(-2L),
latest(-1L);
private final long referencePoint;
StartOffset(long referencePoint) {
this.referencePoint = referencePoint;
}
public long getReferencePoint() {
return this.referencePoint;
}
}
public Map<String, String> getConfiguration() {
return this.configuration;
}
@@ -123,4 +138,29 @@ public class KafkaConsumerProperties {
public void setDlqName(String dlqName) {
this.dlqName = dlqName;
}
public String[] getTrustedPackages() {
return trustedPackages;
}
public void setTrustedPackages(String[] trustedPackages) {
this.trustedPackages = trustedPackages;
}
public StandardHeaders getStandardHeaders() {
return this.standardHeaders;
}
public void setStandardHeaders(StandardHeaders standardHeaders) {
this.standardHeaders = standardHeaders;
}
public String getConverterBeanName() {
return this.converterBeanName;
}
public void setConverterBeanName(String converterBeanName) {
this.converterBeanName = converterBeanName;
}
}

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,16 +16,17 @@
package org.springframework.cloud.stream.binder.kafka.properties;
import org.springframework.expression.Expression;
import java.util.HashMap;
import java.util.Map;
import javax.validation.constraints.NotNull;
import org.springframework.expression.Expression;
/**
* @author Marius Bogoevici
* @author Henryk Konsek
* @author Gary Russell
*/
public class KafkaProducerProperties {
@@ -39,6 +40,8 @@ public class KafkaProducerProperties {
private Expression messageKeyExpression;
private String[] headerPatterns;
private Map<String, String> configuration = new HashMap<>();
public int getBufferSize() {
@@ -82,6 +85,14 @@ public class KafkaProducerProperties {
this.messageKeyExpression = messageKeyExpression;
}
public String[] getHeaderPatterns() {
return this.headerPatterns;
}
public void setHeaderPatterns(String[] headerPatterns) {
this.headerPatterns = headerPatterns;
}
public Map<String, String> getConfiguration() {
return this.configuration;
}

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -1,16 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
<name>spring-cloud-stream-binder-kafka-docs</name>
<description>Spring Cloud Stream Kafka Binder Docs</description>
<artifactId>spring-cloud-stream-binder-kafka11-docs</artifactId>
<name>spring-cloud-stream-binder-kafka11-docs</name>
<description>Spring Cloud Stream Kafka Binder Docs for the 0.11.x.x client</description>
<properties>
<main.basedir>${basedir}/..</main.basedir>
</properties>
@@ -71,8 +71,8 @@
<quiet>true</quiet>
<stylesheetfile>${basedir}/src/main/javadoc/spring-javadoc.css</stylesheetfile>
<links>
<link>http://docs.spring.io/spring-framework/docs/${spring.version}/javadoc-api/</link>
<link>http://docs.spring.io/spring-shell/docs/current/api/</link>
<link>https://docs.spring.io/spring-framework/docs/${spring.version}/javadoc-api/</link>
<link>https://docs.spring.io/spring-shell/docs/current/api/</link>
</links>
</configuration>
</execution>

View File

@@ -34,7 +34,7 @@ source control.
The projects that require middleware generally include a
`docker-compose.yml`, so consider using
http://compose.docker.io/[Docker Compose] to run the middeware servers
https://compose.docker.io/[Docker Compose] to run the middeware servers
in Docker containers.
=== Documentation
@@ -43,13 +43,13 @@ There is a "full" profile that will generate documentation.
=== Working with the code
If you don't have an IDE preference we would recommend that you use
http://www.springsource.com/developer/sts[Spring Tools Suite] or
http://eclipse.org[Eclipse] when working with the code. We use the
http://eclipse.org/m2e/[m2eclipe] eclipse plugin for maven support. Other IDEs and tools
https://www.springsource.com/developer/sts[Spring Tools Suite] or
https://eclipse.org[Eclipse] when working with the code. We use the
https://eclipse.org/m2e/[m2eclipe] eclipse plugin for maven support. Other IDEs and tools
should also work without issue.
==== Importing into eclipse with m2eclipse
We recommend the http://eclipse.org/m2e/[m2eclipe] eclipse plugin when working with
We recommend the https://eclipse.org/m2e/[m2eclipe] eclipse plugin when working with
eclipse. If you don't already have m2eclipse installed it is available from the "eclipse
marketplace".

View File

@@ -24,7 +24,7 @@ added after the original pull request but before a merge.
`eclipse-code-formatter.xml` file from the
https://github.com/spring-cloud/build/tree/master/eclipse-coding-conventions.xml[Spring
Cloud Build] project. If using IntelliJ, you can use the
http://plugins.jetbrains.com/plugin/6546[Eclipse Code Formatter
https://plugins.jetbrains.com/plugin/6546[Eclipse Code Formatter
Plugin] to import the same file.
* Make sure all new `.java` files to have a simple Javadoc class comment with at least an
`@author` tag identifying you, and preferably at least a paragraph on what the class is
@@ -37,6 +37,6 @@ added after the original pull request but before a merge.
* A few unit tests would help a lot as well -- someone has to do it.
* If no-one else is using your branch, please rebase it against the current master (or
other target branch in the main project).
* When writing a commit message please follow http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html[these conventions],
* When writing a commit message please follow https://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html[these conventions],
if you are fixing an existing issue please add `Fixes gh-XXXX` at the end of the commit
message (where XXXX is the issue number).

View File

@@ -11,24 +11,27 @@ Sabby Anandan, Marius Bogoevici, Eric Bottard, Mark Fisher, Ilayaperumal Gopinat
:spring-cloud-stream-binder-kafka-repo: snapshot
:github-tag: master
:spring-cloud-stream-binder-kafka-docs-version: current
:spring-cloud-stream-binder-kafka-docs: http://docs.spring.io/spring-cloud-stream-binder-kafka/docs/{spring-cloud-stream-binder-kafka-docs-version}/reference
:spring-cloud-stream-binder-kafka-docs-current: http://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current-SNAPSHOT/reference/html/
:spring-cloud-stream-binder-kafka-docs: https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/{spring-cloud-stream-binder-kafka-docs-version}/reference
:spring-cloud-stream-binder-kafka-docs-current: https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current-SNAPSHOT/reference/html/
:github-repo: spring-cloud/spring-cloud-stream-binder-kafka
:github-raw: http://raw.github.com/{github-repo}/{github-tag}
:github-code: http://github.com/{github-repo}/tree/{github-tag}
:github-wiki: http://github.com/{github-repo}/wiki
:github-master-code: http://github.com/{github-repo}/tree/master
:github-raw: https://raw.github.com/{github-repo}/{github-tag}
:github-code: https://github.com/{github-repo}/tree/{github-tag}
:github-wiki: https://github.com/{github-repo}/wiki
:github-master-code: https://github.com/{github-repo}/tree/master
:sc-ext: java
// ======================================================================================
= Reference Guide
include::overview.adoc[]
include::dlq.adoc[]
include::metrics.adoc[]
= Appendices
[appendix]
include::building.adoc[]
include::contributing.adoc[]
// ======================================================================================

View File

@@ -73,6 +73,11 @@ spring.cloud.stream.kafka.binder.headers::
The list of custom headers that will be transported by the binder.
+
Default: empty.
spring.cloud.stream.kafka.binder.healthTimeout::
The time to wait to get partition information in seconds; default 60.
Health will report as down if this timer expires.
+
Default: 10.
spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow::
The frequency, in milliseconds, with which offsets are saved.
Ignored if `0`.
@@ -168,6 +173,16 @@ dlqName::
The name of the DLQ topic to receive the error messages.
+
Default: null (If not specified, messages that result in errors will be forwarded to a topic named `error.<destination>.<group>`).
standardHeaders::
Indicates which standard headers are populated by the inbound channel adapter.
`none`, `id`, `timestamp` or `both`.
Useful if using native deserialization and the first component to receive a message needs an `id` (such as an aggregator that is configured to use a JDBC message store).
+
Default: `none`
converterBeanName::
The name of a bean that implements `RecordMessageConverter`; used in the inbound channel adapter to replace the default `MessagingMessageConverter`.
+
Default: `null`
=== Kafka Producer Properties
@@ -241,7 +256,7 @@ public class ManuallyAcknowdledgingConsumer {
==== Example: security configuration
Apache Kafka 0.9 supports secure connections between client and brokers.
To take advantage of this feature, follow the guidelines in the http://kafka.apache.org/090/documentation.html#security_configclients[Apache Kafka Documentation] as well as the Kafka 0.9 http://docs.confluent.io/2.0.0/kafka/security.html[security guidelines from the Confluent documentation].
To take advantage of this feature, follow the guidelines in the https://kafka.apache.org/090/documentation.html#security_configclients[Apache Kafka Documentation] as well as the Kafka 0.9 https://docs.confluent.io/2.0.0/kafka/security.html[security guidelines from the Confluent documentation].
Use the `spring.cloud.stream.kafka.binder.configuration` option to set security properties for all clients created by the binder.
For example, for setting `security.protocol` to `SASL_SSL`, set:
@@ -253,7 +268,7 @@ spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
All the other security properties can be set in a similar manner.
When using Kerberos, follow the instructions in the http://kafka.apache.org/090/documentation.html#security_sasl_clientconfig[reference documentation] for creating and referencing the JAAS configuration.
When using Kerberos, follow the instructions in the https://kafka.apache.org/090/documentation.html#security_sasl_clientconfig[reference documentation] for creating and referencing the JAAS configuration.
Spring Cloud Stream supports passing JAAS configuration information to the application using a JAAS configuration file and using Spring Boot properties.
@@ -436,7 +451,7 @@ Spring Cloud Stream Kafka support also includes a binder specifically designed f
Using this binder, applications can be written that leverage the Kafka Streams API.
For more information on Kafka Streams, see https://kafka.apache.org/documentation/streams/developer-guide[Kafka Streams API Developer Manual]
Kafka Streams support in Spring Cloud Stream is based on the foundations provided by the Spring Kafka project. For details on that support, see http://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams[Kafaka Streams Support in Spring Kafka].
Kafka Streams support in Spring Cloud Stream is based on the foundations provided by the Spring Kafka project. For details on that support, see https://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams[Kafaka Streams Support in Spring Kafka].
Here are the maven coordinates for the Spring Cloud Stream KStream binder artifact.
@@ -523,5 +538,15 @@ spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.c
spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde
----
[[kafka-error-channels]]
== Error Channels
Starting with _version 1.3_, the binder unconditionally sends exceptions to an error channel for each consumer destination, and can be configured to send async producer send failures to an error channel too.
See <<binder-error-channels>> for more information.
The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureException` with properties:
* `failedMessage` - the spring-messaging `Message<?>` that failed to be sent.
* `record` - the raw `ProducerRecord` that was created from the `failedMessage`
There is no automatic handling of these exceptions (such as sending to a <<kafka-dlq-processing, Dead-Letter queue>>); you can consume these exceptions with your own Spring Integration flow.

View File

@@ -9,7 +9,7 @@
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
@@ -20,7 +20,7 @@
-->
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:xslthl="http://xslthl.sf.net"
xmlns:xslthl="http://xslthl.sourceforge.net/"
xmlns:d="http://docbook.org/ns/docbook"
exclude-result-prefixes="xslthl d"
version='1.0'>

View File

@@ -9,7 +9,7 @@ to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
@@ -20,7 +20,7 @@ under the License.
-->
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:xslthl="http://xslthl.sf.net"
xmlns:xslthl="http://xslthl.sourceforge.net/"
xmlns:d="http://docbook.org/ns/docbook"
exclude-result-prefixes="xslthl d"
version='1.0'>

View File

@@ -9,7 +9,7 @@ to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an

View File

@@ -9,7 +9,7 @@ to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an

View File

@@ -9,7 +9,7 @@ to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
@@ -20,7 +20,7 @@ under the License.
-->
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:xslthl="http://xslthl.sf.net"
xmlns:xslthl="http://xslthl.sourceforge.net/"
xmlns:d="http://docbook.org/ns/docbook"
exclude-result-prefixes="xslthl"
version='1.0'>

View File

@@ -9,7 +9,7 @@ to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
@@ -22,7 +22,7 @@ under the License.
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:d="http://docbook.org/ns/docbook"
xmlns:fo="http://www.w3.org/1999/XSL/Format"
xmlns:xslthl="http://xslthl.sf.net"
xmlns:xslthl="http://xslthl.sourceforge.net/"
xmlns:xlink='http://www.w3.org/1999/xlink'
xmlns:exsl="http://exslt.org/common"
exclude-result-prefixes="exsl xslthl d xlink"

View File

@@ -19,5 +19,5 @@
<highlighter id="properties" file="./xslthl/properties-hl.xml" />
<highlighter id="json" file="./xslthl/json-hl.xml" />
<highlighter id="yaml" file="./xslthl/yaml-hl.xml" />
<namespace prefix="xslthl" uri="http://xslthl.sf.net" />
<namespace prefix="xslthl" uri="http://xslthl.sourceforge.net/" />
</xslthl-config>

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for SH
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2010 Mathieu Malaterre
This software is provided 'as-is', without any express or implied

View File

@@ -3,7 +3,7 @@
Syntax highlighting definition for C
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2005-2008 Michal Molhanec, Jirka Kosek, Michiel Hendriks
This software is provided 'as-is', without any express or implied

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for C++
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2005-2008 Michal Molhanec, Jirka Kosek, Michiel Hendriks
This software is provided 'as-is', without any express or implied

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for C#
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2005-2008 Michal Molhanec, Jirka Kosek, Michiel Hendriks
This software is provided 'as-is', without any express or implied

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for CSS files
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2011-2012 Martin Hujer, Michiel Hendriks
This software is provided 'as-is', without any express or implied
@@ -26,7 +26,7 @@ freely, subject to the following restrictions:
Martin Hujer <mhujer at users.sourceforge.net>
Michiel Hendriks <elmuerte at users.sourceforge.net>
Reference: http://www.w3.org/TR/CSS21/propidx.html
Reference: https://www.w3.org/TR/CSS21/propidx.html
-->
<highlighters>

View File

@@ -7,7 +7,7 @@
myxml-hl.xml - konfigurace zvyraznovace XML, ktera zvlast zvyrazni
HTML elementy a XSL elementy
This file has been customized for the Asciidoctor project (http://asciidoctor.org).
This file has been customized for the Asciidoctor project (https://asciidoctor.org).
-->
<highlighters>
<highlighter type="xml">

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for ini files
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2005-2008 Michal Molhanec, Jirka Kosek, Michiel Hendriks
This software is provided 'as-is', without any express or implied

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for Java
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2005-2008 Michal Molhanec, Jirka Kosek, Michiel Hendriks
This software is provided 'as-is', without any express or implied

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for JavaScript
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2005-2008 Michal Molhanec, Jirka Kosek, Michiel Hendriks
This software is provided 'as-is', without any express or implied

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for Perl
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2005-2008 Michal Molhanec, Jirka Kosek, Michiel Hendriks
This software is provided 'as-is', without any express or implied

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for PHP
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2005-2008 Michal Molhanec, Jirka Kosek, Michiel Hendriks
This software is provided 'as-is', without any express or implied

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for Java
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2005-2008 Michal Molhanec, Jirka Kosek, Michiel Hendriks
This software is provided 'as-is', without any express or implied

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for Python
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2005-2008 Michal Molhanec, Jirka Kosek, Michiel Hendriks
This software is provided 'as-is', without any express or implied

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for Ruby
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2005-2008 Michal Molhanec, Jirka Kosek, Michiel Hendriks
This software is provided 'as-is', without any express or implied

View File

@@ -4,7 +4,7 @@
Syntax highlighting definition for SQL:1999
xslthl - XSLT Syntax Highlighting
http://sourceforge.net/projects/xslthl/
https://sourceforge.net/projects/xslthl/
Copyright (C) 2012 Michiel Hendriks, Martin Hujer, k42b3
This software is provided 'as-is', without any express or implied

View File

@@ -1,22 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<artifactId>spring-cloud-stream-binder-kafka11</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-binder-kafka</name>
<description>Kafka binder implementation</description>
<name>spring-cloud-stream-binder-kafka11</name>
<description>Kafka binder implementation for the 0.11.x.x client</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.3.0.M2</version>
<artifactId>spring-cloud-stream-binder-kafka11-parent</artifactId>
<version>1.3.1.BUILD-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<artifactId>spring-cloud-stream-binder-kafka11-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -54,10 +54,6 @@
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
@@ -66,6 +62,7 @@
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -0,0 +1,125 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.kafka.core.ConsumerFactory;
/**
* Health indicator for Kafka.
*
* @author Ilayaperumal Gopinathan
* @author Marius Bogoevici
* @author Henryk Konsek
* @author Gary Russell
*/
public class KafkaBinderHealthIndicator implements HealthIndicator {
private static final int DEFAULT_TIMEOUT = 60;
private final KafkaMessageChannelBinder binder;
private final ConsumerFactory<?, ?> consumerFactory;
private int timeout = DEFAULT_TIMEOUT;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
ConsumerFactory<?, ?> consumerFactory) {
this.binder = binder;
this.consumerFactory = consumerFactory;
}
/**
* Set the timeout in seconds to retrieve health information.
* @param timeout the timeout - default 60.
*/
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public Health health() {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Health> future = exec.submit(new Callable<Health>() {
@Override
public Health call() {
try (Consumer<?, ?> metadataConsumer = consumerFactory.createConsumer()) {
Set<String> downMessages = new HashSet<>();
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
downMessages.add(partitionInfo.toString());
}
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
else {
return Health.down()
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
.build();
}
}
catch (Exception e) {
return Health.down(e).build();
}
}
});
try {
return future.get(this.timeout, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Health.down()
.withDetail("Interrupted while waiting for partition information in", this.timeout + " seconds")
.build();
}
catch (ExecutionException e) {
return Health.down(e).build();
}
catch (TimeoutException e) {
return Health.down()
.withDetail("Failed to retrieve partition information in", this.timeout + " seconds")
.build();
}
finally {
exec.shutdownNow();
}
}
}

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,23 +16,35 @@
package org.springframework.cloud.stream.binder.kafka;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderHeaders;
@@ -41,6 +53,7 @@ import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties.StandardHeaders;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
@@ -54,20 +67,31 @@ import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAd
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
@@ -87,40 +111,45 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
* @author Soby Chacko
* @author Henryk Konsek
* @author Doug Saus
* @author Aldo Sinanaj
*/
public class KafkaMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
public static final String X_ORIGINAL_TOPIC = "x-original-topic";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
private final KafkaBinderConfigurationProperties configurationProperties;
private final Map<String, TopicInformation> topicsInUse = new HashMap<>();
private final KafkaTransactionManager<byte[], byte[]> transactionManager;
private final TransactionTemplate transactionTemplate;
private ProducerListener<byte[], byte[]> producerListener;
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
super(false, headersToMap(configurationProperties), provisioningProvider);
super(true, null, provisioningProvider);
this.configurationProperties = configurationProperties;
}
private static String[] headersToMap(KafkaBinderConfigurationProperties configurationProperties) {
String[] headersToMap;
if (ObjectUtils.isEmpty(configurationProperties.getHeaders())) {
headersToMap = BinderHeaders.STANDARD_HEADERS;
if (StringUtils.hasText(configurationProperties.getTransaction().getTransactionIdPrefix())) {
this.transactionManager = new KafkaTransactionManager<>(
getProducerFactory(configurationProperties.getTransaction().getTransactionIdPrefix(),
new ExtendedProducerProperties<>(configurationProperties.getTransaction().getProducer())));
this.transactionTemplate = new TransactionTemplate(this.transactionManager);
}
else {
String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0,
BinderHeaders.STANDARD_HEADERS.length + configurationProperties.getHeaders().length);
System.arraycopy(configurationProperties.getHeaders(), 0, combinedHeadersToMap,
BinderHeaders.STANDARD_HEADERS.length,
configurationProperties.getHeaders().length);
headersToMap = combinedHeadersToMap;
this.transactionManager = null;
this.transactionTemplate = null;
}
return headersToMap;
}
public void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) {
@@ -147,16 +176,32 @@ public class KafkaMessageChannelBinder extends
@Override
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
final DefaultKafkaProducerFactory<byte[], byte[]> producerFB = getProducerFactory(producerProperties);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel errorChannel)
throws Exception {
/*
* IMPORTANT: With a transactional binder, individual producer properties for
* Kafka are ignored; the global binder
* (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
* instead, for all producers. A binder is transactional when
* 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
*/
final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
? this.transactionManager.getProducerFactory()
: getProducerFactory(null, producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
producerProperties.getPartitionCount(),
false,
new Callable<Collection<PartitionInfo>>() {
@Override
public Collection<PartitionInfo> call() throws Exception {
return producerFB.createProducer().partitionsFor(destination.getName());
Producer<byte[], byte[]> producer = producerFB.createProducer();
List<PartitionInfo> partitionsFor = producer.partitionsFor(destination.getName());
producer.close();
((DisposableBean) producerFB).destroy();
return partitionsFor;
}
});
this.topicsInUse.put(destination.getName(), new TopicInformation(null, partitions));
if (producerProperties.getPartitionCount() < partitions.size()) {
@@ -165,17 +210,41 @@ public class KafkaMessageChannelBinder extends
+ producerProperties.getPartitionCount() + ", smaller than the actual partition count of "
+ partitions.size() + " of the topic. The larger number will be used instead.");
}
/*
* This is dirty; it relies on the fact that we, and the partition
* interceptor, share a hard reference to the producer properties instance.
* But I don't see another way to fix it since the interceptor has already
* been added to the channel, and we don't have access to the channel here; if
* we did, we could inject the proper partition count there.
* TODO: Consider this when doing the 2.0 binder restructuring.
*/
producerProperties.setPartitionCount(partitions.size());
}
KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFB);
if (this.producerListener != null) {
kafkaTemplate.setProducerListener(this.producerListener);
}
return new ProducerConfigurationMessageHandler(kafkaTemplate, destination.getName(), producerProperties,
producerFB);
ProducerConfigurationMessageHandler handler = new ProducerConfigurationMessageHandler(kafkaTemplate,
destination.getName(), producerProperties, producerFB);
if (errorChannel != null) {
handler.setSendFailureChannel(errorChannel);
}
String[] headerPatterns = producerProperties.getExtension().getHeaderPatterns();
if (headerPatterns != null && headerPatterns.length > 0) {
List<String> patterns = new LinkedList<>(Arrays.asList(headerPatterns));
if (!patterns.contains("!" + MessageHeaders.TIMESTAMP)) {
patterns.add(0, "!" + MessageHeaders.TIMESTAMP);
}
if (!patterns.contains("!" + MessageHeaders.ID)) {
patterns.add(0, "!" + MessageHeaders.ID);
}
handler.setHeaderMapper(new DefaultKafkaHeaderMapper(patterns.toArray(new String[patterns.size()])));
}
return handler;
}
private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(String transactionIdPrefix,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.RETRIES_CONFIG, 0);
@@ -204,7 +273,11 @@ public class KafkaMessageChannelBinder extends
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
}
return new DefaultKafkaProducerFactory<>(props);
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = new DefaultKafkaProducerFactory<>(props);
if (transactionIdPrefix != null) {
producerFactory.setTransactionIdPrefix(transactionIdPrefix);
}
return producerFactory;
}
@Override
@@ -224,10 +297,15 @@ public class KafkaMessageChannelBinder extends
Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
extendedConsumerProperties.getExtension().isAutoRebalanceEnabled(),
new Callable<Collection<PartitionInfo>>() {
@Override
public Collection<PartitionInfo> call() throws Exception {
return consumerFactory.createConsumer().partitionsFor(destination.getName());
Consumer<?, ?> consumer = consumerFactory.createConsumer();
List<PartitionInfo> partitionsFor = consumer.partitionsFor(destination.getName());
consumer.close();
return partitionsFor;
}
});
Collection<PartitionInfo> listenedPartitions;
@@ -256,6 +334,9 @@ public class KafkaMessageChannelBinder extends
|| extendedConsumerProperties.getExtension().isAutoRebalanceEnabled()
? new ContainerProperties(destination.getName())
: new ContainerProperties(topicPartitionInitialOffsets);
if (this.transactionManager != null) {
containerProperties.setTransactionManager(this.transactionManager);
}
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
@SuppressWarnings("rawtypes")
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
@@ -284,6 +365,50 @@ public class KafkaMessageChannelBinder extends
}
final KafkaMessageDrivenChannelAdapter<?, ?> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(
messageListenerContainer);
MessagingMessageConverter messageConverter;
if (extendedConsumerProperties.getExtension().getConverterBeanName() == null) {
messageConverter = new MessagingMessageConverter();
StandardHeaders standardHeaders = extendedConsumerProperties.getExtension().getStandardHeaders();
messageConverter.setGenerateMessageId(StandardHeaders.id.equals(standardHeaders)
|| StandardHeaders.both.equals(standardHeaders));
messageConverter.setGenerateTimestamp(StandardHeaders.timestamp.equals(standardHeaders)
|| StandardHeaders.both.equals(standardHeaders));
}
else {
try {
messageConverter = getApplicationContext().getBean(
extendedConsumerProperties.getExtension().getConverterBeanName(),
MessagingMessageConverter.class);
}
catch (NoSuchBeanDefinitionException e) {
throw new IllegalStateException("Converter bean not present in application context", e);
}
}
KafkaHeaderMapper mapper = null;
if (this.configurationProperties.getHeaderMapperBeanName() != null) {
mapper = getApplicationContext().getBean(this.configurationProperties.getHeaderMapperBeanName(),
KafkaHeaderMapper.class);
}
if (mapper == null) {
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper() {
@Override
public void toHeaders(Headers source, Map<String, Object> headers) {
super.toHeaders(source, headers);
if (headers.size() > 0) {
headers.put("scst_nativeHeadersPresent", Boolean.TRUE);
}
}
};
String[] trustedPackages = extendedConsumerProperties.getExtension().getTrustedPackages();
if (!StringUtils.isEmpty(trustedPackages)) {
headerMapper.addTrustedPackages(trustedPackages);
}
mapper = headerMapper;
}
messageConverter.setHeaderMapper(mapper);
kafkaMessageDrivenChannelAdapter.setMessageConverter(messageConverter);
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, consumerGroup,
extendedConsumerProperties);
@@ -297,6 +422,7 @@ public class KafkaMessageChannelBinder extends
return kafkaMessageDrivenChannelAdapter;
}
@Override
protected ErrorMessageStrategy getErrorMessageStrategy() {
return new RawRecordHeaderErrorMessageStrategy();
@@ -306,8 +432,9 @@ public class KafkaMessageChannelBinder extends
protected MessageHandler getErrorMessageHandler(final ConsumerDestination destination, final String group,
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(
new ExtendedProducerProperties<>(new KafkaProducerProperties()));
ProducerFactory<byte[], byte[]> producerFactory = this.transactionManager != null
? this.transactionManager.getProducerFactory()
: getProducerFactory(null, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
final KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
return new MessageHandler() {
@@ -315,15 +442,28 @@ public class KafkaMessageChannelBinder extends
public void handleMessage(Message<?> message) throws MessagingException {
final ConsumerRecord<?, ?> record = message.getHeaders()
.get(KafkaMessageDrivenChannelAdapter.KAFKA_RAW_DATA, ConsumerRecord.class);
final byte[] key = record.key() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
final byte[] key = record.key() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.key()))
: null;
final byte[] payload = record.value() != null
? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null;
if (message.getPayload() instanceof Throwable) {
final Throwable throwable = (Throwable) message.getPayload();
final String failureMessage = throwable.getMessage();
record.headers()
.add(new RecordHeader(X_ORIGINAL_TOPIC, record.topic().getBytes(StandardCharsets.UTF_8)));
record.headers()
.add(new RecordHeader(X_EXCEPTION_MESSAGE, failureMessage.getBytes(StandardCharsets.UTF_8)));
record.headers()
.add(new RecordHeader(X_EXCEPTION_STACKTRACE, getStackTraceAsString(throwable).getBytes()));
}
String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName())
? extendedConsumerProperties.getExtension().getDlqName()
: "error." + destination.getName() + "." + group;
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(dlqName,
record.partition(), key, payload);
ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(dlqName, record.partition(),
key, payload, record.headers());
ListenableFuture<SendResult<byte[], byte[]>> sentDlq = kafkaTemplate.send(producerRecord);
sentDlq.addCallback(new ListenableFutureCallback<SendResult<byte[], byte[]>>() {
StringBuilder sb = new StringBuilder().append(" a message with key='")
.append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'")
@@ -406,23 +546,30 @@ public class KafkaMessageChannelBinder extends
return original.substring(0, maxCharacters) + "...";
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
private final class ProducerConfigurationMessageHandler extends KafkaProducerMessageHandler<byte[], byte[]>
implements Lifecycle {
private boolean running = true;
private final DefaultKafkaProducerFactory<byte[], byte[]> producerFactory;
private final ProducerFactory<byte[], byte[]> producerFactory;
private ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
ProducerFactory<byte[], byte[]> producerFactory) {
super(kafkaTemplate);
setTopicExpression(new LiteralExpression(topic));
setMessageKeyExpression(producerProperties.getExtension().getMessageKeyExpression());
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
if (producerProperties.isPartitioned()) {
SpelExpressionParser parser = new SpelExpressionParser();
setPartitionIdExpression(parser.parseExpression("headers." + BinderHeaders.PARTITION_HEADER));
setPartitionIdExpression(parser.parseExpression("headers['" + BinderHeaders.PARTITION_HEADER + "']"));
}
if (producerProperties.getExtension().isSync()) {
setSync(true);
@@ -443,7 +590,9 @@ public class KafkaMessageChannelBinder extends
@Override
public void stop() {
producerFactory.stop();
if (this.producerFactory instanceof Lifecycle) {
((Lifecycle) producerFactory).stop();
}
this.running = false;
}
@@ -451,6 +600,30 @@ public class KafkaMessageChannelBinder extends
public boolean isRunning() {
return this.running;
}
@Override
protected void handleMessageInternal(final Message<?> message) throws Exception {
if (KafkaMessageChannelBinder.this.transactionTemplate != null
&& !TransactionSynchronizationManager.isActualTransactionActive()) {
KafkaMessageChannelBinder.this.transactionTemplate.execute(
new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus transactionStatus) {
try {
ProducerConfigurationMessageHandler.super.handleMessageInternal(message);
}
catch (Exception e) {
throw new KafkaException("Exception on transactional send", e);
}
return null;
}
});
}
else {
super.handleMessageInternal(message);
}
}
}
public static class TopicInformation {

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -34,7 +34,6 @@ import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoCon
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderJaasInitializerListener;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation;
@@ -46,7 +45,6 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBin
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.config.codec.kryo.KryoCodecAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
@@ -57,6 +55,7 @@ import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.integration.codec.Codec;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.util.ObjectUtils;
@@ -68,11 +67,12 @@ import org.springframework.util.ObjectUtils;
* @author Mark Fisher
* @author Ilayaperumal Gopinathan
* @author Henryk Konsek
* @author Gary Russell
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({ KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class})
@EnableConfigurationProperties({ KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class })
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
@@ -96,14 +96,20 @@ public class KafkaBinderConfiguration {
private AdminUtilsOperation adminUtilsOperation;
@Bean
KafkaTopicProvisioner provisioningProvider() {
return new KafkaTopicProvisioner(this.configurationProperties, this.adminUtilsOperation);
KafkaBinderConfigurationProperties configurationProperties() {
return new KafkaBinderConfigurationProperties();
}
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder() {
KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaTopicProvisioner(configurationProperties, this.adminUtilsOperation);
}
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
this.configurationProperties, provisioningProvider());
configurationProperties, provisioningProvider);
kafkaMessageChannelBinder.setCodec(this.codec);
kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
@@ -117,7 +123,8 @@ public class KafkaBinderConfiguration {
}
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
@@ -125,14 +132,18 @@ public class KafkaBinderConfiguration {
props.putAll(configurationProperties.getConsumerConfiguration());
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
consumerFactory);
indicator.setTimeout(configurationProperties.getHealthTimeout());
return indicator;
}
@Bean
public PublicMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
public PublicMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties);
}
@@ -145,7 +156,7 @@ public class KafkaBinderConfiguration {
}
@Bean(name = "adminUtilsOperation")
@Conditional(Kafka10Present.class)
@Conditional(Kafka1xPresent.class)
@ConditionalOnClass(name = "kafka.admin.AdminUtils")
public AdminUtilsOperation kafka10AdminUtilsOperation() {
logger.info("AdminUtils selected: Kafka 0.10 AdminUtils");
@@ -153,15 +164,15 @@ public class KafkaBinderConfiguration {
}
@Bean
public ApplicationListener<?> jaasInitializer() throws IOException {
return new KafkaBinderJaasInitializerListener();
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
return new KafkaJaasLoginModuleInitializer();
}
static class Kafka10Present implements Condition {
static class Kafka1xPresent implements Condition {
@Override
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
return AppInfoParser.getVersion().startsWith("0.10");
return AppInfoParser.getVersion().startsWith("0.1");
}
}

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,24 +15,16 @@
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.List;
import org.springframework.cloud.stream.binder.AbstractTestBinder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.codec.Codec;
import org.springframework.integration.codec.kryo.KryoRegistrar;
import org.springframework.integration.codec.kryo.PojoCodec;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.tuple.TupleKryoRegistrar;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Registration;
/**
* @author Soby Chacko
* @author Gary Russell
@@ -40,33 +32,23 @@ import com.esotericsoftware.kryo.Registration;
public abstract class AbstractKafkaTestBinder extends
AbstractTestBinder<KafkaMessageChannelBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
private ApplicationContext applicationContext;
@Override
public void cleanup() {
// do nothing - the rule will take care of that
}
protected void addErrorChannel(GenericApplicationContext context) {
PublishSubscribeChannel errorChannel = new PublishSubscribeChannel();
context.getBeanFactory().initializeBean(errorChannel, IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME);
context.getBeanFactory().registerSingleton(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, errorChannel);
protected final void setApplicationContext(ApplicationContext context) {
this.applicationContext = context;
}
public ApplicationContext getApplicationContext() {
return this.applicationContext;
}
protected static Codec getCodec() {
return new PojoCodec(new TupleRegistrar());
}
private static class TupleRegistrar implements KryoRegistrar {
private final TupleKryoRegistrar delegate = new TupleKryoRegistrar();
@Override
public void registerTypes(Kryo kryo) {
this.delegate.registerTypes(kryo);
}
@Override
public List<Registration> getRegistrations() {
return this.delegate.getRegistrations();
}
return new PojoCodec(new TupleKryoRegistrar());
}
}

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -66,10 +66,10 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
ExtendedProducerProperties.class);
String.class, ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, producerProperties);
.invoke(this.kafkaMessageChannelBinder, "foo", producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -63,10 +63,10 @@ public class KafkaBinderConfigurationPropertiesTest {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
kafkaProducerProperties);
Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
ExtendedProducerProperties.class);
String.class, ExtendedProducerProperties.class);
getProducerFactoryMethod.setAccessible(true);
DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
.invoke(this.kafkaMessageChannelBinder, producerProperties);
.invoke(this.kafkaMessageChannelBinder, "bar", producerProperties);
Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
Map.class);
ReflectionUtils.makeAccessible(producerFactoryConfigField);

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,10 @@
*/
package org.springframework.cloud.stream.binder.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -27,16 +31,16 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
/**
* @author Barry Commins
* @author Gary Russell
*/
public class KafkaBinderHealthIndicatorTest {
@@ -53,14 +57,15 @@ public class KafkaBinderHealthIndicatorTest {
@Mock
private KafkaMessageChannelBinder binder;
private Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = new HashMap<>();
private final Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = new HashMap<>();
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
given(consumerFactory.createConsumer()).willReturn(consumer);
given(binder.getTopicsInUse()).willReturn(topicsInUse);
indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
this.indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
this.indicator.setTimeout(10);
}
@Test
@@ -70,6 +75,7 @@ public class KafkaBinderHealthIndicatorTest {
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
verify(this.consumer).close();
}
@Test
@@ -81,6 +87,25 @@ public class KafkaBinderHealthIndicatorTest {
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
@Test(timeout = 5000)
public void kafkaBinderDoesNotAnswer() {
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
given(consumer.partitionsFor(TEST_TOPIC)).willAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
final int fiveMinutes = 1000 * 60 * 5;
Thread.sleep(fiveMinutes);
return partitions;
}
});
this.indicator.setTimeout(1);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
private List<PartitionInfo> partitions(Node leader) {
List<PartitionInfo> partitions = new ArrayList<>();
partitions.add(new PartitionInfo(TEST_TOPIC, 0, leader, null, null));

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -1,11 +1,11 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,9 +27,15 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -49,44 +55,56 @@ import org.springframework.cloud.stream.binder.PartitionTestSupport;
import org.springframework.cloud.stream.binder.TestUtils;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties.StandardHeaders;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.kafka.support.KafkaSendFailureException;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertTrue;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import static org.mockito.Mockito.mock;
/**
* @author Soby Chacko
* @author Ilayaperumal Gopinathan
* @author Henryk Konsek
* @author Gary Russell
* @author Aldo Sinanaj
*/
public abstract class KafkaBinderTests extends
PartitionCapableBinderTests<AbstractKafkaTestBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
@@ -138,8 +156,7 @@ public abstract class KafkaBinderTests extends
testDlqGuts(false);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testDlqGuts(boolean withRetry) throws Exception {
private void testDlqGuts(boolean withRetry) throws Exception {
AbstractKafkaTestBinder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
DirectChannel moduleInputChannel = new DirectChannel();
@@ -148,6 +165,8 @@ public abstract class KafkaBinderTests extends
moduleInputChannel.subscribe(handler);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionCount(2);
producerProperties.getExtension().setHeaderPatterns(new String[] { MessageHeaders.CONTENT_TYPE,
"dlqTestHeader" });
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(withRetry ? 2 : 1);
consumerProperties.setBackOffInitialInterval(100);
@@ -167,7 +186,6 @@ public abstract class KafkaBinderTests extends
ApplicationContext context = TestUtils.getPropertyValue(binder.getBinder(), "applicationContext",
ApplicationContext.class);
Map<String, MessageChannel> beansOfType = context.getBeansOfType(MessageChannel.class);
SubscribableChannel boundErrorChannel = context
.getBean(producerName + ".testGroup.errors-0", SubscribableChannel.class);
SubscribableChannel globalErrorChannel = context.getBean("errorChannel", SubscribableChannel.class);
@@ -197,12 +215,23 @@ public abstract class KafkaBinderTests extends
"error.dlqTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties);
binderBindUnbindLatency();
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload).build();
Message<String> testMessage = MessageBuilder.withPayload(testMessagePayload)
.setHeader("dlqTestHeader", "propagatedToDlq")
.build();
moduleOutputChannel.send(testMessage);
Message<?> receivedMessage = receive(dlqChannel, 3);
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload);
final MessageHeaders headers = receivedMessage.getHeaders();
assertThat(headers.get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC)).isEqualTo(producerName.getBytes());
assertThat(headers.get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE))
.isEqualTo(
"failed to send Message to channel 'null'; nested exception is java.lang.RuntimeException: fail"
.getBytes());
assertThat(headers.get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE)).isNotNull();
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isNotNull();
assertThat(headers.get("dlqTestHeader")).isEqualTo("propagatedToDlq");
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
binderBindUnbindLatency();
@@ -967,13 +996,23 @@ public abstract class KafkaBinderTests extends
@Test
@Override
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testPartitionedModuleJava() throws Exception {
Binder binder = getBinder();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient;
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
invokeCreateTopic(zkUtils, "partJ.0", 8, 1, new Properties());
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setConcurrency(2);
consumerProperties.setInstanceCount(3);
consumerProperties.setInstanceCount(4);
consumerProperties.setInstanceIndex(0);
consumerProperties.setPartitioned(true);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
@@ -988,11 +1027,15 @@ public abstract class KafkaBinderTests extends
QueueChannel input2 = new QueueChannel();
input2.setBeanName("test.input2J");
Binding<MessageChannel> input2Binding = binder.bindConsumer("partJ.0", "test", input2, consumerProperties);
consumerProperties.setInstanceIndex(3);
QueueChannel input3 = new QueueChannel();
input3.setBeanName("test.input3J");
Binding<MessageChannel> input3Binding = binder.bindConsumer("partJ.0", "test", input3, consumerProperties);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionKeyExtractorClass(PartitionTestSupport.class);
producerProperties.setPartitionSelectorClass(PartitionTestSupport.class);
producerProperties.setPartitionCount(3);
producerProperties.setPartitionCount(3); // overridden to 8 on the actual topic
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties));
output.setBeanName("test.output");
Binding<MessageChannel> outputBinding = binder.bindProducer("partJ.0", output, producerProperties);
@@ -1005,6 +1048,7 @@ public abstract class KafkaBinderTests extends
output.send(new GenericMessage<>(2));
output.send(new GenericMessage<>(1));
output.send(new GenericMessage<>(0));
output.send(new GenericMessage<>(3));
Message<?> receive0 = receive(input0);
assertThat(receive0).isNotNull();
@@ -1012,20 +1056,18 @@ public abstract class KafkaBinderTests extends
assertThat(receive1).isNotNull();
Message<?> receive2 = receive(input2);
assertThat(receive2).isNotNull();
Message<?> receive3 = receive(input3);
assertThat(receive3).isNotNull();
if (usesExplicitRouting()) {
assertThat(receive0.getPayload()).isEqualTo(0);
assertThat(receive1.getPayload()).isEqualTo(1);
assertThat(receive2.getPayload()).isEqualTo(2);
}
else {
List<Message<?>> receivedMessages = Arrays.asList(receive0, receive1, receive2);
assertThat(receivedMessages).extracting("payload").containsExactlyInAnyOrder(0, 1, 2);
}
assertThat(receive0.getPayload()).isEqualTo(0);
assertThat(receive1.getPayload()).isEqualTo(1);
assertThat(receive2.getPayload()).isEqualTo(2);
assertThat(receive3.getPayload()).isEqualTo(3);
input0Binding.unbind();
input1Binding.unbind();
input2Binding.unbind();
input3Binding.unbind();
outputBinding.unbind();
}
@@ -1385,7 +1427,7 @@ public abstract class KafkaBinderTests extends
}
@Test
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testNativeSerializationWithCustomSerializerDeserializer() throws Exception {
Binding<?> producerBinding = null;
Binding<?> consumerBinding = null;
@@ -1413,6 +1455,7 @@ public abstract class KafkaBinderTests extends
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
consumerProperties.getExtension().getConfiguration().put("value.deserializer",
"org.apache.kafka.common.serialization.IntegerDeserializer");
consumerProperties.getExtension().setStandardHeaders(StandardHeaders.both);
consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
@@ -1421,6 +1464,70 @@ public abstract class KafkaBinderTests extends
assertThat(inbound).isNotNull();
assertThat(inbound.getPayload()).isEqualTo(10);
assertThat(inbound.getHeaders()).doesNotContainKey("contentType");
assertThat(inbound.getHeaders().getId()).isNotNull();
assertThat(inbound.getHeaders().getTimestamp()).isNotNull();
}
finally {
if (producerBinding != null) {
producerBinding.unbind();
}
if (consumerBinding != null) {
consumerBinding.unbind();
}
}
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testNativeSerializationWithCustomSerializerDeserializerBytesPayload() throws Exception {
Binding<?> producerBinding = null;
Binding<?> consumerBinding = null;
try {
byte[] testPayload = new byte[1];
Message<?> message = MessageBuilder.withPayload(testPayload)
.setHeader(MessageHeaders.CONTENT_TYPE, "something/funky")
.build();
SubscribableChannel moduleOutputChannel = new DirectChannel();
String testTopicName = "existing" + System.currentTimeMillis();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
final ZkClient zkClient;
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
final ZkUtils zkUtils = new ZkUtils(zkClient, null, false);
invokeCreateTopic(zkUtils, testTopicName, 1, 1, new Properties());
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
ConfigurableApplicationContext context = TestUtils.getPropertyValue(binder, "binder.applicationContext",
ConfigurableApplicationContext.class);
MessagingMessageConverter converter = new MessagingMessageConverter();
converter.setGenerateMessageId(true);
converter.setGenerateTimestamp(true);
context.getBeanFactory().registerSingleton("testConverter", converter);
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setUseNativeEncoding(true);
producerProperties.getExtension()
.getConfiguration()
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel, producerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
consumerProperties.getExtension()
.getConfiguration()
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProperties.getExtension().setConverterBeanName("testConverter");
consumerBinding = binder.bindConsumer(testTopicName, "test", moduleInputChannel, consumerProperties);
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
moduleOutputChannel.send(message);
Message<?> inbound = receive(moduleInputChannel, 500);
assertThat(inbound).isNotNull();
assertThat(inbound.getPayload()).isEqualTo(new byte[1]);
assertThat(inbound.getHeaders()).containsKey("contentType");
assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString()).isEqualTo("something/funky");
assertThat(inbound.getHeaders().getId()).isNotNull();
assertThat(inbound.getHeaders().getTimestamp()).isNotNull();
}
finally {
if (producerBinding != null) {
@@ -1590,7 +1697,7 @@ public abstract class KafkaBinderTests extends
}
@Test
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testSendAndReceiveWithRawMode() throws Exception {
Binder binder = getBinder();
DirectChannel moduleOutputChannel = new DirectChannel();
@@ -1722,6 +1829,76 @@ public abstract class KafkaBinderTests extends
assertThat(extractEndpoint(producerBinding).isRunning()).isFalse();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testProducerErrorChannel() throws Exception {
AbstractKafkaTestBinder binder = getBinder();
DirectChannel moduleOutputChannel = createBindableChannel("output", new BindingProperties());
ExtendedProducerProperties<KafkaProducerProperties> producerProps = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
producerProps.setHeaderMode(HeaderMode.raw);
producerProps.setErrorChannelEnabled(true);
Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0", moduleOutputChannel, producerProps);
final Message<?> message = MessageBuilder.withPayload("bad").setHeader(MessageHeaders.CONTENT_TYPE, "foo/bar")
.build();
SubscribableChannel ec = binder.getApplicationContext().getBean("ec.0.errors", SubscribableChannel.class);
final AtomicReference<Message<?>> errorMessage = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(2);
ec.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
errorMessage.set(message);
latch.countDown();
}
});
SubscribableChannel globalEc = binder.getApplicationContext()
.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, SubscribableChannel.class);
globalEc.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
latch.countDown();
}
});
KafkaProducerMessageHandler endpoint = TestUtils.getPropertyValue(producerBinding, "lifecycle",
KafkaProducerMessageHandler.class);
final RuntimeException fooException = new RuntimeException("foo");
final AtomicReference<Object> sent = new AtomicReference<>();
new DirectFieldAccessor(endpoint).setPropertyValue("kafkaTemplate",
new KafkaTemplate(mock(ProducerFactory.class)) {
@Override // SIK < 2.3
public ListenableFuture<SendResult> send(String topic, Object payload) {
sent.set(payload);
SettableListenableFuture<SendResult> future = new SettableListenableFuture<>();
future.setException(fooException);
return future;
}
@Override // SIK 2.3+
public ListenableFuture<SendResult> send(ProducerRecord record) {
sent.set(record.value());
SettableListenableFuture<SendResult> future = new SettableListenableFuture<>();
future.setException(fooException);
return future;
}
});
moduleOutputChannel.send(message);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(errorMessage.get()).isInstanceOf(ErrorMessage.class);
assertThat(errorMessage.get().getPayload()).isInstanceOf(KafkaSendFailureException.class);
KafkaSendFailureException exception = (KafkaSendFailureException) errorMessage.get().getPayload();
assertThat(exception.getCause()).isSameAs(fooException);
assertThat(new String(((byte[] )exception.getFailedMessage().getPayload()))).isEqualTo(message.getPayload());
assertThat(exception.getRecord().value()).isSameAs(sent.get());
producerBinding.unbind();
}
@Override
protected void binderBindUnbindLatency() throws InterruptedException {
Thread.sleep(500);

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,

Some files were not shown because too many files have changed in this diff Show More