Compare commits

...

19 Commits

Author SHA1 Message Date
Soby Chacko
de4b5a9443 2.1.0.M2 Release 2018-08-28 10:04:25 -04:00
Soby Chacko
a090614709 kafka Streams binder configuration fix
(to address core changes made in regards to Boot 2.1)

Ensure that KafkaStreamsBinderSupportAutoConfiguration runs
after BindingServiceConfiguration from core.
2018-08-14 10:33:51 -04:00
Gary Russell
a3f7ca9756 Fix mock to use poll(Duration) 2018-08-07 14:45:33 -04:00
Soby Chacko
82b07a0120 Fixing checkstyle issues 2018-08-07 14:23:47 -04:00
Soby Chacko
0d0cf8dcb7 Update to spring-cloud-build 2.1.0 snapshot
Spring Boot 2.1.0 snapshot
Spring Kafka 2.2.0/3.1.0 snapshots
Apache Kafka client 2.0.0
Fixing tests
Removing deprecations and removals
Polishing

Resolves #424
2018-08-07 14:02:52 -04:00
Soby Chacko
cbfe03be2f Kafka Streams binder test disabling
Temporarily disabling KafkaBinderBootstrapTest in Kafka Streams binder
due to builds taking much longer times due to this.
2018-08-03 08:11:52 -04:00
Soby Chacko
1f0c6cabc6 Revert "Kafka Streams binder test disabling"
This reverts commit 0c61cedc85.
2018-08-03 08:10:59 -04:00
Soby Chacko
0c61cedc85 Kafka Streams binder test disabling
Temporarily disabling KafkaBinderBootstrapTest in Kafka Streams binder
due to builds taking much longer times due to this.
2018-08-02 12:59:02 -04:00
Soby Chacko
44210f1b72 Update Kafka binder metrics docs
Fix the wrong metric name used in the Kafka binder metrics for consumer offset lag.
Update the description.

Resolves #422
2018-08-02 09:57:27 -04:00
Soby Chacko
7007f9494a JAAS initializer regression
Fix JAAS initializer with setting the missing properties.

Resoves #419

Polishing
2018-07-27 14:34:17 -04:00
Gary Russell
da268bb6dd GH-309: Use actual partition count
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/309

If more partitions exist than those configured, use the actual.

Resolves #416
2018-07-25 20:31:27 +02:00
Soby Chacko
e03baefbaf Kafka streams binder issues in custom environment
When kafka streams binder is used in the context of a custom environment, possibly
for a multi binder use case, it is unable to query the outer context as the normal
parent context is absent. This change will ensure that the binder context has access
to the outer context so that it can use any beans it needs from it in KStream or KTable
binder configuration.

Resolves #411
2018-07-25 13:14:47 -04:00
Gary Russell
01396b6573 GH-413: Configure Kafka Streams Cleanup Behavior
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/413
2018-07-24 16:53:57 -04:00
Soby Chacko
3f009a8267 Autoconfigure optimization
Add spring-boot-autoconfigure-processor to the kafka-streams binder for
auto configuration optimization.

Resolves #406
2018-07-24 15:56:48 -04:00
Gary Russell
31bb86b002 GH-404: Synchronize shared consumer
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/404

The fix for issue https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/231
added a shared consumer but the consumer is not thread safe. Add synchronization.

Also, a timeout was added to the `KafkaBinderHealthIndicator` but not to the
`KafkaBinderMetrics` which has a similar shared consumer; add a timeout there.
2018-07-11 13:30:24 -04:00
Soby Chacko
cc2dfd1d08 Upgrade kafka client to 1.1.0 (#405)
* Upgrade kafka client to 1.1.0

Upgrade kafka client to 1.1.0 for both kafka and kafka-streams binders

Resolves #370

* Address review comments
2018-07-09 17:37:58 -04:00
jmaxwell
fc768ba695 GH-402 Add additional data to DLQ message headers 2018-07-02 11:34:46 -04:00
Soby Chacko
38d6deb4d5 Provide programmatic access to KafkaStreams object
Providing access to the underlying StreamBuilderFactoryBean by making the bean name
deterministic. Eariler, the binder was using UUID to make the stream builder factory
bean names unique in the event of multiple StreamListeners. Switching to use the
method name instead to keep the StreamBuilder factory beans unique while providing
a deterministic way to giving it programmatic access.

Polishing docs

Fixes #396
2018-06-29 16:45:55 -04:00
Soby Chacko
2a0b9015de Next update version: 2.1.0.BUILD-SNAPSHOT 2018-06-27 14:11:40 -04:00
38 changed files with 665 additions and 290 deletions

40
pom.xml
View File

@@ -2,20 +2,20 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.0.M1</version>
<version>2.1.0.M2</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.0.2.RELEASE</version>
<version>2.1.0.M1</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.1.5.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.0.3.RELEASE</spring-integration-kafka.version>
<kafka.version>1.0.1</kafka.version>
<spring-cloud-stream.version>2.1.0.M1</spring-cloud-stream.version>
<spring-kafka.version>2.2.0.M2</spring-kafka.version>
<spring-integration-kafka.version>3.1.0.M1</spring-integration-kafka.version>
<kafka.version>2.0.0</kafka.version>
<spring-cloud-stream.version>2.1.0.M2</spring-cloud-stream.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>
@@ -47,6 +47,13 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
@@ -101,6 +108,27 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<classifier>test</classifier>
<scope>test</scope>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.0.M1</version>
<version>2.1.0.M2</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.0.M1</version>
<version>2.1.0.M2</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.0.M1</version>
<version>2.1.0.M2</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>

View File

@@ -1,6 +1,6 @@
== Usage
For using the Kafka Streams binder, you just need to add it to your Spring Cloud Stream application, using the following
For using the Kafka Streams binder, you just need to add it to your Spring Cloud Stream application, using the following
Maven coordinates:
[source,xml]
@@ -13,26 +13,26 @@ Maven coordinates:
== Kafka Streams Binder Overview
Spring Cloud Stream's Apache Kafka support also includes a binder implementation designed explicitly for Apache Kafka
Streams binding. With this native integration, a Spring Cloud Stream "processor" application can directly use the
Spring Cloud Stream's Apache Kafka support also includes a binder implementation designed explicitly for Apache Kafka
Streams binding. With this native integration, a Spring Cloud Stream "processor" application can directly use the
https://kafka.apache.org/documentation/streams/developer-guide[Apache Kafka Streams] APIs in the core business logic.
Kafka Streams binder implementation builds on the foundation provided by the http://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams[Kafka Streams in Spring Kafka]
Kafka Streams binder implementation builds on the foundation provided by the http://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams[Kafka Streams in Spring Kafka]
project.
As part of this native integration, the high-level https://docs.confluent.io/current/streams/developer-guide/dsl-api.html[Streams DSL]
As part of this native integration, the high-level https://docs.confluent.io/current/streams/developer-guide/dsl-api.html[Streams DSL]
provided by the Kafka Streams API is available for use in the business logic, too.
An early version of the https://docs.confluent.io/current/streams/developer-guide/processor-api.html[Processor API]
An early version of the https://docs.confluent.io/current/streams/developer-guide/processor-api.html[Processor API]
support is available as well.
As noted early-on, Kafka Streams support in Spring Cloud Stream strictly only available for use in the Processor model.
A model in which the messages read from an inbound topic, business processing can be applied, and the transformed messages
As noted early-on, Kafka Streams support in Spring Cloud Stream strictly only available for use in the Processor model.
A model in which the messages read from an inbound topic, business processing can be applied, and the transformed messages
can be written to an outbound topic. It can also be used in Processor applications with a no-outbound destination.
=== Streams DSL
This application consumes data from a Kafka topic (e.g., `words`), computes word count for each unique word in a 5 seconds
This application consumes data from a Kafka topic (e.g., `words`), computes word count for each unique word in a 5 seconds
time window, and the computed results are sent to a downstream topic (e.g., `counts`) for further processing.
[source]
@@ -65,12 +65,12 @@ Once built as a uber-jar (e.g., `wordcount-processor.jar`), you can run the abov
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts
----
This application will consume messages from the Kafka topic `words` and the computed results are published to an output
This application will consume messages from the Kafka topic `words` and the computed results are published to an output
topic `counts`.
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as
KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic
required in the processor. Setting up the Streams DSL specific configuration required by the Kafka Streams infrastructure
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as
KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic
required in the processor. Setting up the Streams DSL specific configuration required by the Kafka Streams infrastructure
is automatically handled by the framework.
== Configuration Options
@@ -81,7 +81,7 @@ For common configuration options and properties pertaining to binder, refer to t
=== Kafka Streams Properties
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.streams.binder.`
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.streams.binder.`
literal.
configuration::
@@ -96,7 +96,7 @@ spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.a
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
----
For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in
For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in
Apache Kafka Streams docs.
brokers::
@@ -119,7 +119,7 @@ applicationId::
+
Default: `default`
The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`
The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`
literal.
keySerde::
@@ -135,7 +135,7 @@ useNativeEncoding::
+
Default: `false`.
The following properties are _only_ available for Kafka Streams consumers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer.`
The following properties are _only_ available for Kafka Streams consumers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer.`
literal.
keySerde::
@@ -176,8 +176,8 @@ Default: `none`.
== Multiple Input Bindings
For use cases that requires multiple incoming KStream objects or a combination of KStream and KTable objects, the Kafka
Streams binder provides multiple bindings support.
For use cases that requires multiple incoming KStream objects or a combination of KStream and KTable objects, the Kafka
Streams binder provides multiple bindings support.
Let's see it in action.
@@ -206,11 +206,11 @@ interface KStreamKTableBinding {
----
In the above example, the application is written as a sink, i.e. there are no output bindings and the application has to
decide concerning downstream processing. When you write applications in this style, you might want to send the information
In the above example, the application is written as a sink, i.e. there are no output bindings and the application has to
decide concerning downstream processing. When you write applications in this style, you might want to send the information
downstream or store them in a state store (See below for Queryable State Stores).
In the case of incoming KTable, if you want to materialize the computations to a state store, you have to express it
In the case of incoming KTable, if you want to materialize the computations to a state store, you have to express it
through the following property.
[source]
@@ -244,13 +244,13 @@ interface KStreamKTableBinding extends KafkaStreamsProcessor {
== Multiple Output Bindings (aka Branching)
Kafka Streams allow outbound data to be split into multiple topics based on some predicates. The Kafka Streams binder provides
Kafka Streams allow outbound data to be split into multiple topics based on some predicates. The Kafka Streams binder provides
support for this feature without compromising the programming model exposed through `StreamListener` in the end user application.
You can write the application in the usual way as demonstrated above in the word count example. However, when using the
branching feature, you are required to do a few things. First, you need to make sure that your return type is `KStream[]`
instead of a regular `KStream`. Second, you need to use the `SendTo` annotation containing the output bindings in the order
(see example below). For each of these output bindings, you need to configure destination, content-type etc., complying with
You can write the application in the usual way as demonstrated above in the word count example. However, when using the
branching feature, you are required to do a few things. First, you need to make sure that your return type is `KStream[]`
instead of a regular `KStream`. Second, you need to use the `SendTo` annotation containing the output bindings in the order
(see example below). For each of these output bindings, you need to configure destination, content-type etc., complying with
the standard Spring Cloud Stream expectations.
Here is an example:
@@ -330,21 +330,21 @@ spring.cloud.stream.bindings.input:
== Message Conversion
Similar to message-channel based binder applications, the Kafka Streams binder adapts to the out-of-the-box content-type
Similar to message-channel based binder applications, the Kafka Streams binder adapts to the out-of-the-box content-type
conversions without any compromise.
It is typical for Kafka Streams operations to know the type of SerDes used to transform the key and value correctly.
Therefore, it may be more natural to rely on the SerDe facilities provided by the Apache Kafka Streams library itself at
Therefore, it may be more natural to rely on the SerDe facilities provided by the Apache Kafka Streams library itself at
the inbound and outbound conversions rather than using the content-type conversions offered by the framework.
On the other hand, you might be already familiar with the content-type conversion patterns provided by the framework, and
On the other hand, you might be already familiar with the content-type conversion patterns provided by the framework, and
that, you'd like to continue using for inbound and outbound conversions.
Both the options are supported in the Kafka Streams binder implementation.
Both the options are supported in the Kafka Streams binder implementation.
==== Outbound serialization
If native encoding is disabled (which is the default), then the framework will convert the message using the contentType
set by the user (otherwise, the default `application/json` will be applied). It will ignore any SerDe set on the outbound
If native encoding is disabled (which is the default), then the framework will convert the message using the contentType
set by the user (otherwise, the default `application/json` will be applied). It will ignore any SerDe set on the outbound
in this case for outbound serialization.
Here is the property to set the contentType on the outbound.
@@ -361,7 +361,7 @@ Here is the property to enable native encoding.
spring.cloud.stream.bindings.output.nativeEncoding: true
----
If native encoding is enabled on the output binding (user has to enable it as above explicitly), then the framework will
If native encoding is enabled on the output binding (user has to enable it as above explicitly), then the framework will
skip any form of automatic message conversion on the outbound. In that case, it will switch to the Serde set by the user.
The `valueSerde` property set on the actual output binding will be used. Here is an example.
@@ -372,7 +372,7 @@ spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde: org.apach
If this property is not set, then it will use the "default" SerDe: `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
It is worth to mention that Kafka Streams binder does not serialize the keys on outbound - it simply relies on Kafka itself.
Therefore, you either have to specify the `keySerde` property on the binding or it will default to the application-wide common
Therefore, you either have to specify the `keySerde` property on the binding or it will default to the application-wide common
`keySerde`.
Binding level key serde:
@@ -418,9 +418,9 @@ spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSer
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde
----
Then if you have `SendTo` like this, @SendTo({"output1", "output2", "output3"}), the `KStream[]` from the branches are
applied with proper SerDe objects as defined above. If you are not enabling `nativeEncoding`, you can then set different
contentType values on the output bindings as below. In that case, the framework will use the appropriate message converter
Then if you have `SendTo` like this, @SendTo({"output1", "output2", "output3"}), the `KStream[]` from the branches are
applied with proper SerDe objects as defined above. If you are not enabling `nativeEncoding`, you can then set different
contentType values on the output bindings as below. In that case, the framework will use the appropriate message converter
to convert the messages before sending to Kafka.
[source]
@@ -434,8 +434,8 @@ spring.cloud.stream.bindings.output3.contentType: application/octet-stream
Similar rules apply to data deserialization on the inbound.
If native decoding is disabled (which is the default), then the framework will convert the message using the contentType
set by the user (otherwise, the default `application/json` will be applied). It will ignore any SerDe set on the inbound
If native decoding is disabled (which is the default), then the framework will convert the message using the contentType
set by the user (otherwise, the default `application/json` will be applied). It will ignore any SerDe set on the inbound
in this case for inbound deserialization.
Here is the property to set the contentType on the inbound.
@@ -452,8 +452,8 @@ Here is the property to enable native decoding.
spring.cloud.stream.bindings.input.nativeDecoding: true
----
If native decoding is enabled on the input binding (user has to enable it as above explicitly), then the framework will
skip doing any message conversion on the inbound. In that case, it will switch to the SerDe set by the user. The `valueSerde`
If native decoding is enabled on the input binding (user has to enable it as above explicitly), then the framework will
skip doing any message conversion on the inbound. In that case, it will switch to the SerDe set by the user. The `valueSerde`
property set on the actual output binding will be used. Here is an example.
[source]
@@ -464,7 +464,7 @@ spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde: org.apache
If this property is not set, it will use the default SerDe: `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
It is worth to mention that Kafka Streams binder does not deserialize the keys on inbound - it simply relies on Kafka itself.
Therefore, you either have to specify the `keySerde` property on the binding or it will default to the application-wide common
Therefore, you either have to specify the `keySerde` property on the binding or it will default to the application-wide common
`keySerde`.
Binding level key serde:
@@ -481,8 +481,8 @@ Common Key serde:
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
----
As in the case of KStream branching on the outbound, the benefit of setting value SerDe per binding is that if you have
multiple input bindings (multiple KStreams object) and they all require separate value SerDe's, then you can configure
As in the case of KStream branching on the outbound, the benefit of setting value SerDe per binding is that if you have
multiple input bindings (multiple KStreams object) and they all require separate value SerDe's, then you can configure
them individually. If you use the common configuration approach, then this feature won't be applicable.
== Error Handling
@@ -490,7 +490,7 @@ them individually. If you use the common configuration approach, then this featu
Apache Kafka Streams provide the capability for natively handling exceptions from deserialization errors.
For details on this support, please see https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers[this]
Out of the box, Apache Kafka Streams provide two kinds of deserialization exception handlers - `logAndContinue` and `logAndFail`.
As the name indicates, the former will log the error and continue processing the next records and the latter will log the
As the name indicates, the former will log the error and continue processing the next records and the latter will log the
error and fail. `LogAndFail` is the default deserialization exception handler.
=== Handling Deserialization Exceptions
@@ -502,7 +502,7 @@ Kafka Streams binder supports a selection of exception handlers through the foll
spring.cloud.stream.kafka.streams.binder.serdeError: logAndContinue
----
In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous
In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous
records (poison pills) to a DLQ topic. Here is how you enable this DLQ exception handler.
[source]
@@ -516,27 +516,27 @@ When the above property is set, all the deserialization error records are automa
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: foo-dlq
----
If this is set, then the error records are sent to the topic `foo-dlq`. If this is not set, then it will create a DLQ
If this is set, then the error records are sent to the topic `foo-dlq`. If this is not set, then it will create a DLQ
topic with the name `error.<input-topic-name>.<group-name>`.
A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder.
* The property `spring.cloud.stream.kafka.streams.binder.serdeError` is applicable for the entire application. This implies
* The property `spring.cloud.stream.kafka.streams.binder.serdeError` is applicable for the entire application. This implies
that if there are multiple `StreamListener` methods in the same application, this property is applied to all of them.
* The exception handling for deserialization works consistently with native deserialization and framework provided message
* The exception handling for deserialization works consistently with native deserialization and framework provided message
conversion.
=== Handling Non-Deserialization Exceptions
For general error handling in Kafka Streams binder, it is up to the end user applications to handle application level errors.
As a side effect of providing a DLQ for deserialization exception handlers, Kafka Streams binder provides a way to get
As a side effect of providing a DLQ for deserialization exception handlers, Kafka Streams binder provides a way to get
access to the DLQ sending bean directly from your application.
Once you get access to that bean, you can programmatically send any exception records from your application to the DLQ.
It continues to remain hard to robust error handling using the high-level DSL; Kafka Streams doesn't natively support error
handling yet.
It continues to remain hard to robust error handling using the high-level DSL; Kafka Streams doesn't natively support error
handling yet.
However, when you use the low-level Processor API in your application, there are options to control this behavior. See
However, when you use the low-level Processor API in your application, there are options to control this behavior. See
below.
[source]
@@ -652,3 +652,23 @@ else {
//query from the remote host
}
----
== Accessing the underlying KafkaStreams object
`StreamBuilderFactoryBean` from spring-kafka that is responsible for constructing the `KafkaStreams` object can be accessed programmatically.
Each `StreamBuilderFactoryBean` is registered as `stream-builder` and appended with the `StreamListener` method name.
If your `StreamListener` method is named as `process` for example, the stream builder bean is named as `stream-builder-process`.
Since this is a factory bean, it should be accessed by prepending an ampersand (`&`) when accessing it programmatically.
Following is an example and it assumes the `StreamListener` method is named as `process`
[source]
----
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
----
== State Cleanup
By default, the `Kafkastreams.cleanup()` method is called when the binding is stopped.
See https://docs.spring.io/spring-kafka/reference/html/_reference.html#_configuration[the Spring Kafka documentation].
To modify this behavior simply add a single `CleanupConfig` `@Bean` (configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.

View File

@@ -488,6 +488,6 @@ You can consume these exceptions with your own Spring Integration flow.
Kafka binder module exposes the following metrics:
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
For example, if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, the consumer group named `myGroup` has `1000` messages waiting to be consumed from the topic calle `myTopic`.
`spring.cloud.stream.binder.kafka.offset`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
The metrics provided are based on the Mircometer metrics library. The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
This metric is particularly useful for providing auto-scaling feedback to a PaaS platform.

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.0.M1</version>
<version>2.1.0.M2</version>
</parent>
<dependencies>
@@ -45,11 +45,6 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<classifier>test</classifier>
</dependency>
<!-- Added back since Kafka still depends on it, but it has been removed by Boot due to EOL -->
<dependency>
<groupId>log4j</groupId>
@@ -62,5 +57,24 @@
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- Following dependencies are needed to support Kafka 1.1.0 client-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</project>
</project>

View File

@@ -16,17 +16,18 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
/**
* @author Marius Bogoevici
@@ -34,18 +35,30 @@ import org.springframework.context.annotation.Configuration;
* @author Soby Chacko
*/
@Configuration
@Import({KafkaAutoConfiguration.class})
public class KStreamBinderConfiguration {
private static final Log logger = LogFactory.getLog(KStreamBinderConfiguration.class);
@Autowired
private KafkaProperties kafkaProperties;
@Autowired
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
@Bean
@ConditionalOnBean(name = "outerContext")
public BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() {
return beanFactory -> {
ApplicationContext outerContext = (ApplicationContext) beanFactory.getBean("outerContext");
beanFactory.registerSingleton(KafkaStreamsBinderConfigurationProperties.class.getSimpleName(), outerContext
.getBean(KafkaStreamsBinderConfigurationProperties.class));
beanFactory.registerSingleton(KafkaStreamsMessageConversionDelegate.class.getSimpleName(), outerContext
.getBean(KafkaStreamsMessageConversionDelegate.class));
beanFactory.registerSingleton(KafkaStreamsBindingInformationCatalogue.class.getSimpleName(), outerContext
.getBean(KafkaStreamsBindingInformationCatalogue.class));
beanFactory.registerSingleton(KeyValueSerdeResolver.class.getSimpleName(), outerContext
.getBean(KeyValueSerdeResolver.class));
beanFactory.registerSingleton(KafkaStreamsExtendedBindingProperties.class.getSimpleName(), outerContext
.getBean(KafkaStreamsExtendedBindingProperties.class));
};
}
@Bean
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties) {
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties,
KafkaProperties kafkaProperties) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties);
}
@@ -54,7 +67,8 @@ public class KStreamBinderConfiguration {
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver) {
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
KStreamBinder kStreamBinder = new KStreamBinder(binderConfigurationProperties, kafkaTopicProvisioner,
KafkaStreamsMessageConversionDelegate, KafkaStreamsBindingInformationCatalogue,
keyValueSerdeResolver);

View File

@@ -16,27 +16,37 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Soby Chacko
*/
@Configuration
public class KTableBinderConfiguration {
@Autowired
private KafkaProperties kafkaProperties;
@Autowired
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
@Bean
@ConditionalOnBean(name = "outerContext")
public BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() {
return beanFactory -> {
ApplicationContext outerContext = (ApplicationContext) beanFactory.getBean("outerContext");
beanFactory.registerSingleton(KafkaStreamsBinderConfigurationProperties.class.getSimpleName(), outerContext
.getBean(KafkaStreamsBinderConfigurationProperties.class));
beanFactory.registerSingleton(KafkaStreamsBindingInformationCatalogue.class.getSimpleName(), outerContext
.getBean(KafkaStreamsBindingInformationCatalogue.class));
};
}
@Bean
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties) {
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties,
KafkaProperties kafkaProperties) {
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties);
}

View File

@@ -25,7 +25,9 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -34,9 +36,11 @@ import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStr
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.util.ObjectUtils;
/**
@@ -46,6 +50,7 @@ import org.springframework.util.ObjectUtils;
*/
@EnableConfigurationProperties(KafkaStreamsExtendedBindingProperties.class)
@ConditionalOnBean(BindingService.class)
@AutoConfigureAfter(BindingServiceConfiguration.class)
public class KafkaStreamsBinderSupportAutoConfiguration {
@Bean
@@ -99,10 +104,12 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KStreamStreamListenerParameterAdapter kafkaStreamListenerParameterAdapter,
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
ObjectProvider<CleanupConfig> cleanupConfig) {
return new KafkaStreamsStreamListenerSetupMethodOrchestrator(bindingServiceProperties,
kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue,
kafkaStreamListenerParameterAdapter, streamListenerResultAdapters, binderConfigurationProperties);
kafkaStreamListenerParameterAdapter, streamListenerResultAdapters, binderConfigurationProperties,
cleanupConfig.getIfUnique());
}
@Bean

View File

@@ -41,7 +41,7 @@ import org.springframework.util.StringUtils;
*
* @author Soby Chacko
*/
class KafkaStreamsMessageConversionDelegate {
public class KafkaStreamsMessageConversionDelegate {
private static final ThreadLocal<KeyValue<Object, Object>> keyValueThreadLocal = new ThreadLocal<>();
@@ -182,12 +182,6 @@ class KafkaStreamsMessageConversionDelegate {
}
}
@SuppressWarnings("deprecation")
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {

View File

@@ -21,21 +21,19 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -64,6 +62,7 @@ import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@@ -86,6 +85,7 @@ import org.springframework.util.StringUtils;
*
* @author Soby Chacko
* @author Lei Chen
* @author Gary Russell
*/
class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListenerSetupMethodOrchestrator, ApplicationContextAware {
@@ -107,6 +107,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
private final CleanupConfig cleanupConfig;
private ConfigurableApplicationContext applicationContext;
KafkaStreamsStreamListenerSetupMethodOrchestrator(BindingServiceProperties bindingServiceProperties,
@@ -115,7 +117,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
StreamListenerParameterAdapter streamListenerParameterAdapter,
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
CleanupConfig cleanupConfig) {
this.bindingServiceProperties = bindingServiceProperties;
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
this.keyValueSerdeResolver = keyValueSerdeResolver;
@@ -123,6 +126,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
this.streamListenerParameterAdapter = streamListenerParameterAdapter;
this.streamListenerResultAdapters = streamListenerResultAdapters;
this.binderConfigurationProperties = binderConfigurationProperties;
this.cleanupConfig = cleanupConfig;
}
@Override
@@ -291,6 +295,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
}
private <K,V> KTable<K,V> materializedAs(StreamsBuilder streamsBuilder, String destination, String storeName, Serde<K> k, Serde<V> v) {
return streamsBuilder.table(bindingServiceProperties.getBindingDestination(destination),
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(k)
@@ -384,14 +389,6 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
private StreamsConfig buildStreamsBuilderAndRetrieveConfig(Method method, ApplicationContext applicationContext,
BindingProperties bindingProperties) {
ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean();
streamsBuilder.setAutoStartup(false);
String uuid = UUID.randomUUID().toString();
BeanDefinition streamsBuilderBeanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(), () -> streamsBuilder)
.getRawBeanDefinition();
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("stream-builder-" + uuid, streamsBuilderBeanDefinition);
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean("&stream-builder-" + uuid, StreamsBuilderFactoryBean.class);
String group = bindingProperties.getGroup();
if (!StringUtils.hasText(group)) {
group = binderConfigurationProperties.getApplicationId();
@@ -418,12 +415,20 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
return super.getConfiguredInstance(key, clazz);
}
};
StreamsBuilderFactoryBean streamsBuilder = this.cleanupConfig == null
? new StreamsBuilderFactoryBean(streamsConfig)
: new StreamsBuilderFactoryBean(streamsConfig, this.cleanupConfig);
streamsBuilder.setAutoStartup(false);
BeanDefinition streamsBuilderBeanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(), () -> streamsBuilder)
.getRawBeanDefinition();
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("stream-builder-" + method.getName(), streamsBuilderBeanDefinition);
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean("&stream-builder-" + method.getName(), StreamsBuilderFactoryBean.class);
BeanDefinition streamsConfigBeanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsConfig>) streamsConfig.getClass(), () -> streamsConfig)
.getRawBeanDefinition();
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("streamsConfig-" + uuid, streamsConfigBeanDefinition);
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("streamsConfig-" + method.getName(), streamsConfigBeanDefinition);
streamsBuilder.setStreamsConfig(streamsConfig);
methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderX);
return streamsConfig;
}

View File

@@ -0,0 +1,80 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.bootstrap;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.test.rule.KafkaEmbedded;
/**
* @author Soby Chacko
*/
@Ignore("Temporarily disabling the test as builds are getting slower due to this.")
public class KafkaStreamsBinderBootstrapTest {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
@Test
public void testKafkaStreamsBinderWithCustomEnvironmentCanStart() {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
.web(WebApplicationType.NONE)
.run("--spring.cloud.stream.bindings.input.destination=foo",
"--spring.cloud.stream.bindings.input.binder=kBind1",
"--spring.cloud.stream.binders.kBind1.type=kstream",
"--spring.cloud.stream.binders.kBind1.environment.spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.binders.kBind1.environment.spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
applicationContext.close();
}
@Test
public void testKafkaStreamsBinderWithStandardConfigurationCanStart() {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
.web(WebApplicationType.NONE)
.run("--spring.cloud.stream.bindings.input.destination=foo",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
applicationContext.close();
}
@SpringBootApplication
@EnableBinding(StreamSourceProcessor.class)
static class SimpleApplication {
@StreamListener
public void handle(@Input("input") KStream<Object, String> stream) {
}
}
interface StreamSourceProcessor {
@Input("input")
KStream<?, ?> inputStream();
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.util.Arrays;
import java.util.Map;
@@ -72,7 +72,7 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
"error.word1.groupx", "error.word2.groupx");
@SpyBean
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
private static Consumer<String, String> consumer;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.util.Map;
@@ -66,7 +66,7 @@ public abstract class DeserializtionErrorHandlerByBinderTests {
"error.foos1.fooz-group", "error.foos2.fooz-group");
@SpyBean
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
private static Consumer<Integer, String> consumer;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.util.ArrayList;
import java.util.Arrays;

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017 the original author or authors.
* Copyright 2017-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.util.Arrays;
import java.util.Date;
@@ -24,11 +24,14 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -45,9 +48,13 @@ import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
@@ -101,7 +108,17 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {
receiveAndValidate(context);
} finally {
//Assertions on StreamBuilderFactoryBean
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
ReadOnlyWindowStore<Object, Object> store = kafkaStreams.store("foo-WordCounts", QueryableStoreTypes.windowStore());
assertThat(store).isNotNull();
CleanupConfig cleanup = TestUtils.getPropertyValue(streamsBuilderFactoryBean, "cleanupConfig",
CleanupConfig.class);
assertThat(cleanup.cleanupOnStart()).isTrue();
assertThat(cleanup.cleanupOnStop()).isFalse();
}
finally {
context.close();
}
}
@@ -129,8 +146,6 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
public KStream<?, WordCount> process(@Input("input") KStream<Object, String> input) {
input.map((k,v) -> {
System.out.println(k);
System.out.println(v);
return new KeyValue<>(k,v);
});
return input
@@ -143,6 +158,11 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
}
@Bean
public CleanupConfig cleanupConfig() {
return new CleanupConfig(true, false);
}
}
static class WordCount {

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.util.Map;
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -39,6 +40,7 @@ import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
@@ -137,7 +139,7 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey(Serialized.with(new Serdes.IntegerSerde(), new JsonSerde<>(Product.class)))
.count("prod-id-count-store")
.count(Materialized.as("prod-id-count-store"))
.toStream()
.map((key, value) -> new KeyValue<>(null, "Count for product with ID 123: " + value));
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.util.Arrays;
import java.util.Map;
@@ -71,7 +71,7 @@ public abstract class KafkaStreamsNativeEncodingDecodingTests {
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts");
@SpyBean
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
private static Consumer<String, String> consumer;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.util.Map;
@@ -116,11 +116,6 @@ public class KafkaStreamsStateStoreIntegrationTests {
processed = true;
}
@Override
public void punctuate(long l) {
}
@Override
public void close() {
if (state != null) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017 the original author or authors.
* Copyright 2017-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.util.Map;
@@ -38,9 +38,12 @@ import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -92,7 +95,15 @@ public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {
receiveAndValidateFoo(context);
} finally {
//Assertions on StreamBuilderFactoryBean
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process",
StreamsBuilderFactoryBean.class);
CleanupConfig cleanup = TestUtils.getPropertyValue(streamsBuilderFactoryBean, "cleanupConfig",
CleanupConfig.class);
assertThat(cleanup.cleanupOnStart()).isFalse();
assertThat(cleanup.cleanupOnStop()).isTrue();
}
finally {
context.close();
}
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.util.ArrayList;
import java.util.Arrays;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.util.Arrays;
import java.util.Date;

View File

@@ -1,5 +1,5 @@
eclipse.preferences.version=1
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=java;javax;com;org;org.springframework;ch.qos;\#;
org.eclipse.jdt.ui.importorder=java;javax;com;io.micrometer;org;org.springframework;ch.qos;\#;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.0.M1</version>
<version>2.1.0.M2</version>
</parent>
<dependencies>
@@ -60,6 +60,26 @@
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Following dependencies are needed to support Kafka 1.1.0 client-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -78,25 +78,31 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
public Health call() {
try {
if (metadataConsumer == null) {
metadataConsumer = consumerFactory.createConsumer();
}
Set<String> downMessages = new HashSet<>();
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
downMessages.add(partitionInfo.toString());
synchronized(KafkaBinderHealthIndicator.this) {
if (metadataConsumer == null) {
metadataConsumer = consumerFactory.createConsumer();
}
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
else {
return Health.down()
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
.build();
synchronized (metadataConsumer) {
Set<String> downMessages = new HashSet<>();
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
downMessages.add(partitionInfo.toString());
}
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
else {
return Health.down()
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
.build();
}
}
}
catch (Exception e) {

View File

@@ -20,11 +20,17 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.TimeGauge;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
@@ -51,9 +57,12 @@ import org.springframework.util.ObjectUtils;
* @author Oleg Zhurakousky
* @author Jon Schneider
* @author Thomas Cheyney
* @author Gary Russell
*/
public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<BindingCreatedEvent> {
private static final int DEFAULT_TIMEOUT = 60;
private final static Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
static final String METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
@@ -68,6 +77,8 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
private Consumer<?, ?> metadataConsumer;
private int timeout = DEFAULT_TIMEOUT;
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties binderConfigurationProperties,
ConsumerFactory<?, ?> defaultConsumerFactory, @Nullable MeterRegistry meterRegistry) {
@@ -84,6 +95,10 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
this(binder, binderConfigurationProperties, null, null);
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public void bindTo(MeterRegistry registry) {
for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse()
@@ -106,33 +121,56 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
}
private double calculateConsumerLagOnTopic(String topic, String group) {
long lag = 0;
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Long> future = exec.submit(() -> {
long lag = 0;
try {
if (metadataConsumer == null) {
synchronized(KafkaBinderMetrics.this) {
if (metadataConsumer == null) {
metadataConsumer = createConsumerFactory(group).createConsumer();
}
}
}
synchronized (metadataConsumer) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new LinkedList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
if (current != null) {
lag += endOffset.getValue() - current.offset();
}
else {
lag += endOffset.getValue();
}
}
}
}
catch (Exception e) {
LOG.debug("Cannot generate metric for topic: " + topic, e);
}
return lag;
});
try {
if (metadataConsumer == null) {
metadataConsumer = createConsumerFactory(group).createConsumer();
}
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new LinkedList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
if (current != null) {
lag += endOffset.getValue() - current.offset();
}
else {
lag += endOffset.getValue();
}
}
return future.get(this.timeout, TimeUnit.SECONDS);
}
catch (Exception e) {
LOG.debug("Cannot generate metric for topic: " + topic, e);
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return 0L;
}
catch (ExecutionException | TimeoutException e) {
return 0L;
}
finally {
exec.shutdownNow();
}
return lag;
}
private ConsumerFactory<?, ?> createConsumerFactory(String group) {

View File

@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -67,32 +68,32 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerPro
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.context.Lifecycle;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.channel.ChannelInterceptorAware;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.AcknowledgmentCallback;
import org.springframework.integration.support.AcknowledgmentCallback.Status;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.StaticMessageHeaderAccessor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
@@ -105,6 +106,7 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
@@ -130,12 +132,21 @@ public class KafkaMessageChannelBinder extends
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
public static final String X_EXCEPTION_FQCN = "x-exception-fqcn";
public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
public static final String X_ORIGINAL_TOPIC = "x-original-topic";
public static final String X_ORIGINAL_PARTITION = "x-original-partition";
public static final String X_ORIGINAL_OFFSET = "x-original-offset";
public static final String X_ORIGINAL_TIMESTAMP = "x-original-timestamp";
public static final String X_ORIGINAL_TIMESTAMP_TYPE = "x-original-timestamp-type";
private final KafkaBinderConfigurationProperties configurationProperties;
@@ -207,6 +218,14 @@ public class KafkaMessageChannelBinder extends
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel errorChannel)
throws Exception {
throw new IllegalStateException("The abstract binder should not call this method");
}
@Override
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
MessageChannel channel, MessageChannel errorChannel)
throws Exception {
/*
* IMPORTANT: With a transactional binder, individual producer properties for Kafka are
* ignored; the global binder (spring.cloud.stream.kafka.binder.transaction.producer.*)
@@ -226,20 +245,18 @@ public class KafkaMessageChannelBinder extends
return partitionsFor;
});
this.topicsInUse.put(destination.getName(), new TopicInformation(null, partitions));
if (producerProperties.getPartitionCount() < partitions.size()) {
if (producerProperties.isPartitioned() && producerProperties.getPartitionCount() < partitions.size()) {
if (this.logger.isInfoEnabled()) {
this.logger.info("The `partitionCount` of the producer for topic " + destination.getName() + " is "
+ producerProperties.getPartitionCount() + ", smaller than the actual partition count of "
+ partitions.size() + " of the topic. The larger number will be used instead.");
+ partitions.size() + " for the topic. The larger number will be used instead.");
}
/*
* This is dirty; it relies on the fact that we, and the partition interceptor, share a
* hard reference to the producer properties instance. But I don't see another way to fix
* it since the interceptor has already been added to the channel, and we don't have
* access to the channel here; if we did, we could inject the proper partition count
* there. TODO: Consider this when doing the 2.0 binder restructuring.
*/
producerProperties.setPartitionCount(partitions.size());
List<ChannelInterceptor> interceptors = ((ChannelInterceptorAware) channel).getChannelInterceptors();
interceptors.forEach(interceptor -> {
if (interceptor instanceof PartitioningInterceptor) {
((PartitioningInterceptor) interceptor).setPartitionCount(partitions.size());
}
});
}
KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFB);
@@ -398,14 +415,14 @@ public class KafkaMessageChannelBinder extends
// end of these won't be needed...
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties()
.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
.setAckMode(ContainerProperties.AckMode.MANUAL);
messageListenerContainer.getContainerProperties().setAckOnError(false);
}
else {
messageListenerContainer.getContainerProperties()
.setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
if (extendedConsumerProperties.getExtension().isAckEachRecord()) {
messageListenerContainer.getContainerProperties().setAckMode(AckMode.RECORD);
messageListenerContainer.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
}
}
if (this.logger.isDebugEnabled()) {
@@ -656,7 +673,7 @@ public class KafkaMessageChannelBinder extends
DlqSender<?,?> dlqSender = new DlqSender(kafkaTemplate);
return message -> {
@SuppressWarnings("unchecked")
final ConsumerRecord<Object, Object> record = message.getHeaders()
.get(KafkaHeaders.RAW_DATA, ConsumerRecord.class);
@@ -683,11 +700,25 @@ public class KafkaMessageChannelBinder extends
Headers kafkaHeaders = new RecordHeaders(record.headers().toArray());
AtomicReference<ConsumerRecord<?, ?>> recordToSend = new AtomicReference<>(record);
if (message.getPayload() instanceof Throwable) {
Throwable throwable = (Throwable) message.getPayload();
HeaderMode headerMode = properties.getHeaderMode();
if (headerMode == null || HeaderMode.headers.equals(headerMode)) {
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TOPIC,
record.topic().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(
new RecordHeader(X_ORIGINAL_TOPIC, record.topic().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_PARTITION,
ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array()));
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_OFFSET,
ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array()));
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TIMESTAMP,
ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array()));
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TIMESTAMP_TYPE,
record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_FQCN,
throwable.getClass().getName().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_MESSAGE,
throwable.getMessage().getBytes(StandardCharsets.UTF_8)));
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_STACKTRACE,
@@ -696,14 +727,18 @@ public class KafkaMessageChannelBinder extends
else if (HeaderMode.embeddedHeaders.equals(headerMode)) {
try {
MessageValues messageValues = EmbeddedHeaderUtils
.extractHeaders(MessageBuilder.withPayload((byte[]) record.value()).build(),
false);
.extractHeaders(MessageBuilder.withPayload((byte[]) record.value()).build(), false);
messageValues.put(X_ORIGINAL_TOPIC, record.topic());
messageValues.put(X_ORIGINAL_PARTITION, record.partition());
messageValues.put(X_ORIGINAL_OFFSET, record.offset());
messageValues.put(X_ORIGINAL_TIMESTAMP, record.timestamp());
messageValues.put(X_ORIGINAL_TIMESTAMP_TYPE, record.timestampType().toString());
messageValues.put(X_EXCEPTION_FQCN, throwable.getClass().getName());
messageValues.put(X_EXCEPTION_MESSAGE, throwable.getMessage());
messageValues.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(throwable));
final String[] headersToEmbed = new ArrayList<>(messageValues.keySet()).toArray(
new String[messageValues.keySet().size()]);
final String[] headersToEmbed = new ArrayList<>(messageValues.keySet())
.toArray(new String[messageValues.keySet().size()]);
byte[] payload = EmbeddedHeaderUtils.embedHeaders(messageValues,
EmbeddedHeaderUtils.headersToEmbed(headersToEmbed));
recordToSend.set(new ConsumerRecord<Object, Object>(record.topic(), record.partition(),
@@ -715,8 +750,7 @@ public class KafkaMessageChannelBinder extends
}
}
String dlqName = StringUtils.hasText(kafkaConsumerProperties.getDlqName())
? kafkaConsumerProperties.getDlqName()
: "error." + record.topic() + "." + group;
? kafkaConsumerProperties.getDlqName() : "error." + record.topic() + "." + group;
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName);
};
}
@@ -747,10 +781,10 @@ public class KafkaMessageChannelBinder extends
((MessagingException) message.getPayload()).getFailedMessage());
if (ack != null) {
if (isAutoCommitOnError(properties)) {
ack.acknowledge(Status.REJECT);
ack.acknowledge(AcknowledgmentCallback.Status.REJECT);
}
else {
ack.acknowledge(Status.REQUEUE);
ack.acknowledge(AcknowledgmentCallback.Status.REQUEUE);
}
}
}

View File

@@ -18,6 +18,8 @@ package org.springframework.cloud.stream.binder.kafka.config;
import java.io.IOException;
import javax.security.auth.login.AppConfigurationEntry;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
@@ -59,7 +61,8 @@ import org.springframework.lang.Nullable;
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaBinderHealthIndicatorConfiguration.class })
@Import({ KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class,
KafkaBinderHealthIndicatorConfiguration.class })
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
@@ -84,7 +87,7 @@ public class KafkaBinderConfiguration {
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider, @Nullable ListenerContainerCustomizer<AbstractMessageListenerContainer<?,?>> listenerContainerCustomizer) {
KafkaTopicProvisioner provisioningProvider, @Nullable ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> listenerContainerCustomizer) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider, listenerContainerCustomizer);
@@ -100,8 +103,34 @@ public class KafkaBinderConfiguration {
}
@Bean
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
return new KafkaJaasLoginModuleInitializer();
@ConditionalOnMissingBean(KafkaJaasLoginModuleInitializer.class)
public KafkaJaasLoginModuleInitializer jaasInitializer(KafkaBinderConfigurationProperties configurationProperties) throws IOException {
KafkaJaasLoginModuleInitializer kafkaJaasLoginModuleInitializer = new KafkaJaasLoginModuleInitializer();
JaasLoginModuleConfiguration jaas = configurationProperties.getJaas();
if (jaas != null) {
kafkaJaasLoginModuleInitializer.setLoginModule(jaas.getLoginModule());
KafkaJaasLoginModuleInitializer.ControlFlag controlFlag = null;
AppConfigurationEntry.LoginModuleControlFlag controlFlagValue = jaas.getControlFlagValue();
if (AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL.equals(controlFlagValue)) {
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.OPTIONAL;
}
else if (AppConfigurationEntry.LoginModuleControlFlag.REQUIRED.equals(controlFlagValue)) {
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED;
}
else if (AppConfigurationEntry.LoginModuleControlFlag.REQUISITE.equals(controlFlagValue)) {
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.REQUISITE;
}
else if (AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT.equals(controlFlagValue)) {
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.SUFFICIENT;
}
if (controlFlag != null) {
kafkaJaasLoginModuleInitializer.setControlFlag(controlFlag);
}
kafkaJaasLoginModuleInitializer.setOptions(jaas.getOptions());
}
return kafkaJaasLoginModuleInitializer;
}
/**

View File

@@ -46,7 +46,8 @@ import static org.assertj.core.api.Assertions.assertThat;
@TestPropertySource(properties = {
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replication-factor=2",
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replicas-assignments.0=0,1",
"spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0" })
"spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0",
"spring.main.allow-bean-definition-overriding=true"})
@EnableIntegration
public class AdminConfigTests {

View File

@@ -73,7 +73,7 @@ public class KafkaBinderHealthIndicatorTest {
@Test
public void kafkaBinderIsUp() {
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group1-healthIndicator", partitions));
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
@@ -82,7 +82,7 @@ public class KafkaBinderHealthIndicatorTest {
@Test
public void kafkaBinderIsDown() {
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group2-healthIndicator", partitions));
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
@@ -91,7 +91,7 @@ public class KafkaBinderHealthIndicatorTest {
@Test(timeout = 5000)
public void kafkaBinderDoesNotAnswer() {
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group3-healthIndicator", partitions));
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willAnswer(new Answer<Object>() {
@Override
@@ -110,7 +110,7 @@ public class KafkaBinderHealthIndicatorTest {
@Test
public void createsConsumerOnceWhenInvokedMultipleTimes() {
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group4-healthIndicator", partitions));
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
indicator.health();

View File

@@ -84,11 +84,11 @@ public class KafkaBinderMetricsTest {
public void shouldIndicateLag() {
org.mockito.BDDMockito.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group1-metrics", partitions));
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).hasSize(1);
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group1-metrics").tag("topic", TEST_TOPIC).timeGauge()
.value(TimeUnit.MILLISECONDS)).isEqualTo(500.0);
}
@@ -100,22 +100,22 @@ public class KafkaBinderMetricsTest {
org.mockito.BDDMockito.given(consumer.endOffsets(ArgumentMatchers.anyCollection())).willReturn(endOffsets);
org.mockito.BDDMockito.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
List<PartitionInfo> partitions = partitions(new Node(0, null, 0), new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group2-metrics", partitions));
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).hasSize(1);
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group2-metrics").tag("topic", TEST_TOPIC).timeGauge()
.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
}
@Test
public void shouldIndicateFullLagForNotCommittedGroups() {
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group3-metrics", partitions));
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).hasSize(1);
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group3-metrics").tag("topic", TEST_TOPIC).timeGauge()
.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
}
@@ -130,11 +130,11 @@ public class KafkaBinderMetricsTest {
@Test
public void createsConsumerOnceWhenInvokedMultipleTimes() {
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group4-metrics", partitions));
metrics.bindTo(meterRegistry);
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge();
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group4-metrics").tag("topic", TEST_TOPIC).timeGauge();
gauge.value(TimeUnit.MILLISECONDS);
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
@@ -147,11 +147,11 @@ public class KafkaBinderMetricsTest {
.willReturn(consumer);
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group5-metrics", partitions));
metrics.bindTo(meterRegistry);
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge();
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group5-metrics").tag("topic", TEST_TOPIC).timeGauge();
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(0);
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.binder.kafka;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -30,10 +31,10 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
@@ -48,6 +49,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
@@ -80,6 +82,7 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerPro
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.context.ApplicationContext;
@@ -100,8 +103,8 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
@@ -109,7 +112,7 @@ import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.test.core.BrokerAddress;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
@@ -118,11 +121,11 @@ import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
@@ -149,7 +152,7 @@ public class KafkaBinderTests extends
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10, "error.pollableDlq.group");
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10, "error.pollableDlq.group-pcWithDlq");
private KafkaTestBinder binder;
@@ -209,7 +212,7 @@ public class KafkaBinderTests extends
private KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties(
new TestKafkaProperties());
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
BrokerAddress[] brokerAddresses = embeddedKafka.getEmbeddedKafka().getBrokerAddresses();
List<String> bAddresses = new ArrayList<>();
for (BrokerAddress bAddress : brokerAddresses) {
bAddresses.add(bAddress.toString());
@@ -223,7 +226,7 @@ public class KafkaBinderTests extends
return consumerFactory().createConsumer().partitionsFor(topic).size();
}
private void invokeCreateTopic(String topic, int partitions, int replicationFactor) throws Throwable {
private void invokeCreateTopic(String topic, int partitions, int replicationFactor) throws Exception {
NewTopic newTopic = new NewTopic(topic, partitions,
(short) replicationFactor);
@@ -242,7 +245,7 @@ public class KafkaBinderTests extends
timeoutMultiplier = Double.parseDouble(multiplier);
}
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
BrokerAddress[] brokerAddresses = embeddedKafka.getEmbeddedKafka().getBrokerAddresses();
List<String> bAddresses = new ArrayList<>();
for (BrokerAddress bAddress : brokerAddresses) {
bAddresses.add(bAddress.toString());
@@ -334,9 +337,9 @@ public class KafkaBinderTests extends
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull();
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo")).isInstanceOf(MimeType.class);
MimeType actual = (MimeType) inboundMessageRef.get().getHeaders().get("foo");
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN);
Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo")).isInstanceOf(String.class);
String actual = (String) inboundMessageRef.get().getHeaders().get("foo");
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN.toString());
producerBinding.unbind();
consumerBinding.unbind();
}
@@ -493,7 +496,7 @@ public class KafkaBinderTests extends
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
.isEqualTo("foo.bar".getBytes(StandardCharsets.UTF_8));
assertThat(new String((byte[]) receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE)))
.startsWith("failed to send Message to channel 'input'");
.startsWith("Dispatcher failed to deliver Message; nested exception is java.lang.RuntimeException: fail");
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE))
.isNotNull();
binderBindUnbindLatency();
@@ -660,21 +663,51 @@ public class KafkaBinderTests extends
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
if (HeaderMode.embeddedHeaders.equals(headerMode)) {
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
.isEqualTo(producerName);
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_PARTITION)).isEqualTo(0);
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_OFFSET))
.isEqualTo(0);
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP)).isNotNull();
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP_TYPE))
.isEqualTo(TimestampType.CREATE_TIME.toString());
assertThat(((String) receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE)))
.startsWith("failed to send Message to channel 'input'");
.startsWith("Dispatcher failed to deliver Message; nested exception is java.lang.RuntimeException: fail");
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE))
.isNotNull();
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_FQCN)).isNotNull();
}
else if (!HeaderMode.none.equals(headerMode)) {
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
.isEqualTo(producerName.getBytes(StandardCharsets.UTF_8));
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_PARTITION))
.isEqualTo(ByteBuffer.allocate(Integer.BYTES).putInt(0).array());
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_OFFSET))
.isEqualTo(ByteBuffer.allocate(Long.BYTES).putLong(0).array());
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP)).isNotNull();
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP_TYPE))
.isEqualTo(TimestampType.CREATE_TIME.toString().getBytes());
assertThat(new String((byte[]) receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE)))
.startsWith("failed to send Message to channel 'input'");
.startsWith("Dispatcher failed to deliver Message; nested exception is java.lang.RuntimeException: fail");
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE))
.isNotNull();
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_FQCN)).isNotNull();
}
else {
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC)).isNull();
@@ -1132,7 +1165,7 @@ public class KafkaBinderTests extends
AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(consumerBinding,
"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(AckMode.BATCH);
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(ContainerProperties.AckMode.BATCH);
String testPayload1 = "foo" + UUID.randomUUID().toString();
Message<?> message1 = org.springframework.integration.support.MessageBuilder.withPayload(
@@ -1179,7 +1212,7 @@ public class KafkaBinderTests extends
AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(consumerBinding2,
"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(AckMode.RECORD);
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(ContainerProperties.AckMode.RECORD);
Message<?> receivedMessage1 = receive(inbound1);
assertThat(receivedMessage1).isNotNull();
@@ -1221,6 +1254,7 @@ public class KafkaBinderTests extends
producerProperties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload"));
producerProperties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
producerProperties.setPartitionCount(3);
invokeCreateTopic("output", 6, 1);
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties));
output.setBeanName("test.output");
@@ -1232,7 +1266,14 @@ public class KafkaBinderTests extends
}
catch (UnsupportedOperationException ignored) {
}
List<ChannelInterceptor> interceptors = output.getChannelInterceptors();
AtomicInteger count = new AtomicInteger();
interceptors.forEach(interceptor -> {
if (interceptor instanceof PartitioningInterceptor) {
count.set(TestUtils.getPropertyValue(interceptor, "partitionHandler.partitionCount", Integer.class));
}
});
assertThat(count.get()).isEqualTo(6);
Message<Integer> message2 = org.springframework.integration.support.MessageBuilder.withPayload(2)
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "foo")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 42)
@@ -2423,7 +2464,7 @@ public class KafkaBinderTests extends
assertThat(inbound.getHeaders().get(BinderHeaders.NATIVE_HEADERS_PRESENT)).isNull();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testSendAndReceiveWithMixedMode", "false",
embeddedKafka);
embeddedKafka.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
@@ -2460,9 +2501,9 @@ public class KafkaBinderTests extends
PollableSource<MessageHandler> inboundBindTarget = new DefaultPollableMessageSource(this.messageConverter);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProps = createConsumerProperties();
consumerProps.setMultiplex(true);
Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer("pollable,anotherOne", "group",
Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer("pollable,anotherOne", "group-polledConsumer",
inboundBindTarget, consumerProps);
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
KafkaTemplate template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps));
template.send("pollable", "testPollable");
boolean polled = inboundBindTarget.poll(m -> {
@@ -2501,8 +2542,8 @@ public class KafkaBinderTests extends
properties.setMaxAttempts(2);
properties.setBackOffInitialInterval(0);
properties.getExtension().setEnableDlq(true);
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer("pollableDlq", "group",
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer("pollableDlq", "group-pcWithDlq",
inboundBindTarget, properties);
KafkaTemplate template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps));
template.send("pollableDlq", "testPollableDLQ");
@@ -2518,12 +2559,12 @@ public class KafkaBinderTests extends
catch (MessageHandlingException e) {
assertThat(e.getCause().getMessage()).isEqualTo("test DLQ");
}
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("dlq", "false", embeddedKafka);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("dlq", "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "error.pollableDlq.group");
ConsumerRecord deadLetter = KafkaTestUtils.getSingleRecord(consumer, "error.pollableDlq.group");
embeddedKafka.getEmbeddedKafka().consumeFromAnEmbeddedTopic(consumer, "error.pollableDlq.group-pcWithDlq");
ConsumerRecord deadLetter = KafkaTestUtils.getSingleRecord(consumer, "error.pollableDlq.group-pcWithDlq");
assertThat(deadLetter).isNotNull();
assertThat(deadLetter.value()).isEqualTo("testPollableDLQ");
binding.unbind();
@@ -2534,7 +2575,7 @@ public class KafkaBinderTests extends
@Test
public void testTopicPatterns() throws Exception {
try (AdminClient admin = AdminClient.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
embeddedKafka.getBrokersAsString()))) {
embeddedKafka.getEmbeddedKafka().getBrokersAsString()))) {
admin.createTopics(Collections.singletonList(new NewTopic("topicPatterns.1", 1, (short) 1))).all().get();
Binder binder = getBinder();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
@@ -2549,7 +2590,7 @@ public class KafkaBinderTests extends
Binding<MessageChannel> consumerBinding = binder.bindConsumer("topicPatterns\\..*",
"testTopicPatterns", moduleInputChannel, consumerProperties);
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(
KafkaTestUtils.producerProps(embeddedKafka));
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka()));
KafkaTemplate template = new KafkaTemplate(pf);
template.send("topicPatterns.1", "foo");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.binder.kafka;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -36,6 +37,7 @@ import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
@@ -82,25 +84,25 @@ public class KafkaBinderUnitTests {
method.setAccessible(true);
// test default for anon
Object factory = method.invoke(binder, true, "foo", ecp);
Object factory = method.invoke(binder, true, "foo-1", ecp);
Map<?, ?> configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest");
// test default for named
factory = method.invoke(binder, false, "foo", ecp);
factory = method.invoke(binder, false, "foo-2", ecp);
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
// binder level setting
binderConfigurationProperties.setConfiguration(
Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"));
factory = method.invoke(binder, false, "foo", ecp);
factory = method.invoke(binder, false, "foo-3", ecp);
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest");
// consumer level setting
consumerProps.setConfiguration(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
factory = method.invoke(binder, false, "foo", ecp);
factory = method.invoke(binder, false, "foo-4", ecp);
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
}
@@ -131,38 +133,38 @@ public class KafkaBinderUnitTests {
@Test
public void testOffsetResetWithGroupManagementEarliest() throws Exception {
testOffsetResetWithGroupManagement(true, true);
testOffsetResetWithGroupManagement(true, true, "foo-100", "testOffsetResetWithGroupManagementEarliest");
}
@Test
public void testOffsetResetWithGroupManagementLatest() throws Throwable {
testOffsetResetWithGroupManagement(false, true);
testOffsetResetWithGroupManagement(false, true, "foo-101", "testOffsetResetWithGroupManagementLatest");
}
@Test
public void testOffsetResetWithManualAssignmentEarliest() throws Exception {
testOffsetResetWithGroupManagement(true, false);
testOffsetResetWithGroupManagement(true, false, "foo-102", "testOffsetResetWithManualAssignmentEarliest");
}
@Test
public void testOffsetResetWithGroupManualAssignmentLatest() throws Throwable {
testOffsetResetWithGroupManagement(false, false);
testOffsetResetWithGroupManagement(false, false, "foo-103", "testOffsetResetWithGroupManualAssignmentLatest");
}
private void testOffsetResetWithGroupManagement(final boolean earliest, boolean groupManage) throws Exception {
private void testOffsetResetWithGroupManagement(final boolean earliest, boolean groupManage, String topic, String group) throws Exception {
final List<TopicPartition> partitions = new ArrayList<>();
partitions.add(new TopicPartition("foo", 0));
partitions.add(new TopicPartition("foo", 1));
partitions.add(new TopicPartition(topic, 0));
partitions.add(new TopicPartition(topic, 1));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
new TestKafkaProperties());
KafkaTopicProvisioner provisioningProvider = mock(KafkaTopicProvisioner.class);
ConsumerDestination dest = mock(ConsumerDestination.class);
given(dest.getName()).willReturn("foo");
given(dest.getName()).willReturn(topic);
given(provisioningProvider.provisionConsumerDestination(anyString(), anyString(), any())).willReturn(dest);
final AtomicInteger part = new AtomicInteger();
willAnswer(i -> {
return partitions.stream()
.map(p -> new PartitionInfo("foo", part.getAndIncrement(), null, null, null))
.map(p -> new PartitionInfo(topic, part.getAndIncrement(), null, null, null))
.collect(Collectors.toList());
}).given(provisioningProvider).getPartitionsForTopic(anyInt(), anyBoolean(), any());
@SuppressWarnings("unchecked")
@@ -176,14 +178,14 @@ public class KafkaBinderUnitTests {
Thread.currentThread().interrupt();
}
return new ConsumerRecords<>(Collections.emptyMap());
}).given(consumer).poll(anyLong());
}).given(consumer).poll(any(Duration.class));
willAnswer(i -> {
((org.apache.kafka.clients.consumer.ConsumerRebalanceListener) i.getArgument(1))
.onPartitionsAssigned(partitions);
latch.countDown();
latch.countDown();
return null;
}).given(consumer).subscribe(eq(Collections.singletonList("foo")),
}).given(consumer).subscribe(eq(Collections.singletonList(topic)),
any(org.apache.kafka.clients.consumer.ConsumerRebalanceListener.class));
willAnswer(i -> {
latch.countDown();
@@ -193,7 +195,7 @@ public class KafkaBinderUnitTests {
@Override
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
return new ConsumerFactory<byte[], byte[]>() {
@@ -212,6 +214,11 @@ public class KafkaBinderUnitTests {
return consumer;
}
@Override
public Consumer<byte[], byte[]> createConsumer(String groupId, String clientIdPrefix, String clientIdSuffix) {
return consumer;
}
@Override
public boolean isAutoCommit() {
return false;
@@ -222,7 +229,7 @@ public class KafkaBinderUnitTests {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
earliest ? "earliest" : "latest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "bar");
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
return props;
}
@@ -240,7 +247,7 @@ public class KafkaBinderUnitTests {
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<KafkaConsumerProperties>(
extension);
consumerProperties.setInstanceCount(1);
binder.bindConsumer("foo", "bar", channel, consumerProperties);
Binding<MessageChannel> messageChannelBinding = binder.bindConsumer(topic, group, channel, consumerProperties);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
if (groupManage) {
if (earliest) {
@@ -260,7 +267,7 @@ public class KafkaBinderUnitTests {
verify(consumer).seek(partitions.get(1), Long.MAX_VALUE);
}
}
messageChannelBinding.unbind();
}
}

View File

@@ -34,7 +34,7 @@ import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProv
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.retry.support.RetryTemplate;
@@ -53,13 +53,13 @@ import static org.mockito.Mockito.spy;
public class KafkaTransactionTests {
@ClassRule
public static final KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);
public static final EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1);
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testProducerRunsInTx() {
KafkaProperties kafkaProperties = new TestKafkaProperties();
kafkaProperties.setBootstrapServers(Collections.singletonList(embeddedKafka.getBrokersAsString()));
kafkaProperties.setBootstrapServers(Collections.singletonList(embeddedKafka.getEmbeddedKafka().getBrokersAsString()));
KafkaBinderConfigurationProperties configurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
configurationProperties.getTransaction().setTransactionIdPrefix("foo-");

View File

@@ -23,7 +23,7 @@ import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
/**
* @author Marius Bogoevici
@@ -31,14 +31,14 @@ import org.springframework.kafka.test.rule.KafkaEmbedded;
public class KafkaBinderBootstrapTest {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10);
@Test
public void testKafkaBinderConfiguration() throws Exception {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
.web(WebApplicationType.NONE)
.run("--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
.run("--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + embeddedKafka.getEmbeddedKafka().getZookeeperConnectionString());
applicationContext.close();
}

View File

@@ -21,7 +21,6 @@ import java.util.Map;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -43,7 +42,7 @@ import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.context.junit4.SpringRunner;
@@ -61,16 +60,16 @@ import static org.assertj.core.api.Assertions.assertThat;
properties = "spring.cloud.stream.bindings.input.group=" + KafkaBinderActuatorTests.TEST_CONSUMER_GROUP)
public class KafkaBinderActuatorTests {
static final String TEST_CONSUMER_GROUP = "testGroup";
static final String TEST_CONSUMER_GROUP = "testGroup-actuatorTests";
private static final String KAFKA_BROKERS_PROPERTY = "spring.kafka.bootstrap-servers";
@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true);
public static EmbeddedKafkaRule kafkaEmbedded = new EmbeddedKafkaRule(1, true);
@BeforeClass
public static void setup() {
System.setProperty(KAFKA_BROKERS_PROPERTY, kafkaEmbedded.getBrokersAsString());
System.setProperty(KAFKA_BROKERS_PROPERTY, kafkaEmbedded.getEmbeddedKafka().getBrokersAsString());
}
@AfterClass