Compare commits
10 Commits
main
...
v3.0.4.REL
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f6aab385ce | ||
|
|
c9a618e1a4 | ||
|
|
e6a82861ba | ||
|
|
b329ad133a | ||
|
|
a5b1d30c1c | ||
|
|
15e3dd447d | ||
|
|
0551d20d3e | ||
|
|
9d7b523c16 | ||
|
|
b8967ff0fa | ||
|
|
999b2caf35 |
33
README.adoc
33
README.adoc
@@ -39,7 +39,7 @@ To use Apache Kafka binder, you need to add `spring-cloud-stream-binder-kafka` a
|
||||
</dependency>
|
||||
----
|
||||
|
||||
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown inn the following example for Maven:
|
||||
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown in the following example for Maven:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
@@ -60,7 +60,7 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka
|
||||
The consumer group maps directly to the same Apache Kafka concept.
|
||||
Partitioning also maps directly to Apache Kafka partitions as well.
|
||||
|
||||
The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker of at least that version.
|
||||
The binder currently uses the Apache Kafka `kafka-clients` version `2.3.1`.
|
||||
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
|
||||
For example, with versions earlier than 0.11.x.x, native headers are not supported.
|
||||
Also, 0.11.x.x does not support the `autoAddPartitions` property.
|
||||
@@ -155,21 +155,15 @@ Default: See individual producer properties.
|
||||
|
||||
spring.cloud.stream.kafka.binder.headerMapperBeanName::
|
||||
The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to and from Kafka headers.
|
||||
Use this, for example, if you wish to customize the trusted packages in a `DefaultKafkaHeaderMapper` that uses JSON deserialization for the headers.
|
||||
Use this, for example, if you wish to customize the trusted packages in a `BinderHeaderMapper` bean that uses JSON deserialization for the headers.
|
||||
If this custom `BinderHeaderMapper` bean is not made available to the binder using this property, then the binder will look for a header mapper bean with the name `kafkaBinderHeaderMapper` that is of type `BinderHeaderMapper` before falling back to a default `BinderHeaderMapper` created by the binder.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
spring.cloud.stream.kafka.binder.authorizationExceptionRetryInterval::
|
||||
Enables retrying in case of authorization exceptions.
|
||||
Defines interval between each retry.
|
||||
Accepts `Duration`, e.g. `30s`, `2m`, etc.
|
||||
+
|
||||
Default: `null` (retries disabled, fail fast)
|
||||
|
||||
[[kafka-consumer-properties]]
|
||||
==== Kafka Consumer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.consumer.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka consumers only and
|
||||
@@ -234,9 +228,20 @@ The DLQ topic name can be configurable by setting the `dlqName` property.
|
||||
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
|
||||
See <<kafka-dlq-processing>> processing for more information.
|
||||
Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message`, and `x-exception-stacktrace` as `byte[]`.
|
||||
By default, a failed record is sent to the same partition number in the DLQ topic as the original record.
|
||||
See <<dlq-partition-selection>> for how to change that behavior.
|
||||
**Not allowed when `destinationIsPattern` is `true`.**
|
||||
+
|
||||
Default: `false`.
|
||||
dlqPartitions::
|
||||
When `enableDlq` is true, and this property is not set, a dead letter topic with the same number of partitions as the primary topic(s) is created.
|
||||
Usually, dead-letter records are sent to the same partition in the dead-letter topic as the original record.
|
||||
This behavior can be changed; see <<dlq-partition-selection>>.
|
||||
If this property is set to `1` and there is no `DqlPartitionFunction` bean, all dead-letter records will be written to partition `0`.
|
||||
If this property is greater than `1`, you **MUST** provide a `DlqPartitionFunction` bean.
|
||||
Note that the actual partition count is affected by the binder's `minPartitionCount` property.
|
||||
+
|
||||
Default: `none`
|
||||
configuration::
|
||||
Map with a key/value pair containing generic Kafka consumer properties.
|
||||
In addition to having Kafka consumer properties, other configuration properties can be passed here.
|
||||
@@ -311,7 +316,7 @@ Refer to the https://docs.spring.io/spring-kafka/docs/2.3.0.BUILD-SNAPSHOT/refer
|
||||
[[kafka-producer-properties]]
|
||||
==== Kafka Producer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.producer.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka producers only and
|
||||
@@ -576,6 +581,10 @@ When used in a processor application, the consumer starts the transaction; any r
|
||||
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
|
||||
A common producer factory is used for all producer bindings configured using `spring.cloud.stream.kafka.binder.transaction.producer.*` properties; individual binding Kafka producer properties are ignored.
|
||||
|
||||
IMPORTANT: Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too.
|
||||
When retries are enabled (the common property `maxAttempts` is greater than zero) the retry properties are used to configure a `DefaultAfterRollbackProcessor` to enable retries at the container level.
|
||||
Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the `DefaultAfterRollbackProcessor` which runs after the main transaction has rolled back.
|
||||
|
||||
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. `@Scheduled` method), you must get a reference to the transactional producer factory and define a `KafkaTransactionManager` bean using it.
|
||||
|
||||
====
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.BUILD-SNAPSHOT</version>
|
||||
<version>3.0.4.RELEASE</version>
|
||||
</parent>
|
||||
<packaging>pom</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
|
||||
@@ -589,11 +589,11 @@ public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Ob
|
||||
|
||||
Then you must set the application id for this using the following binding property.
|
||||
|
||||
`spring.cloud.stream.kafka.streams.bindings.input.applicationId`
|
||||
`spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId`
|
||||
|
||||
and
|
||||
|
||||
`spring.cloud.stream.kafka.streams.bindings.anotherInput.applicationId`
|
||||
`spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId`
|
||||
|
||||
|
||||
For function based model also, this approach of setting application id at the binding level will work.
|
||||
@@ -1448,6 +1448,19 @@ By default, the `Kafkastreams.cleanup()` method is called when the binding is st
|
||||
See https://docs.spring.io/spring-kafka/reference/html/_reference.html#_configuration[the Spring Kafka documentation].
|
||||
To modify this behavior simply add a single `CleanupConfig` `@Bean` (configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.
|
||||
|
||||
|
||||
=== Kafka Streams topology visualization
|
||||
|
||||
Kafka Streams binder provides the following actuator endpoints for retrieving the topology description using which you can visualize the topology using external tools.
|
||||
|
||||
`/actuator/topology`
|
||||
|
||||
`/actuator/topology/<applicaiton-id of the processor>`
|
||||
|
||||
You need to include the actuator and web dependencies from Spring Boot to access these endpoints.
|
||||
Further, you also need to add `topology` to `management.endpoints.web.exposure.include` property.
|
||||
By default, the `topology` endpoint is disabled.
|
||||
|
||||
=== Configuration Options
|
||||
|
||||
This section contains the configuration options used by the Kafka Streams binder.
|
||||
|
||||
10
pom.xml
10
pom.xml
@@ -2,21 +2,21 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.BUILD-SNAPSHOT</version>
|
||||
<version>3.0.4.RELEASE</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.2.2.RELEASE</version>
|
||||
<version>2.2.3.RELEASE</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-kafka.version>2.3.5.RELEASE</spring-kafka.version>
|
||||
<spring-kafka.version>2.3.7.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.2.1.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>2.3.1</kafka.version>
|
||||
<spring-cloud-schema-registry.version>1.0.3.BUILD-SNAPSHOT</spring-cloud-schema-registry.version>
|
||||
<spring-cloud-stream.version>3.0.3.BUILD-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-schema-registry.version>1.0.4.RELEASE</spring-cloud-schema-registry.version>
|
||||
<spring-cloud-stream.version>3.0.4.RELEASE</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.BUILD-SNAPSHOT</version>
|
||||
<version>3.0.4.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.BUILD-SNAPSHOT</version>
|
||||
<version>3.0.4.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.BUILD-SNAPSHOT</version>
|
||||
<version>3.0.4.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -17,9 +17,12 @@
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@@ -45,6 +48,7 @@ import org.springframework.util.StringUtils;
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @author Renwei Han
|
||||
* @author Serhii Siryi
|
||||
* @since 2.1.0
|
||||
*/
|
||||
public class InteractiveQueryService {
|
||||
@@ -153,4 +157,25 @@ public class InteractiveQueryService {
|
||||
return streamsMetadata != null ? streamsMetadata.hostInfo() : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the list of {@link HostInfo} where the provided store is hosted on.
|
||||
* It also can include current host info.
|
||||
* Kafka Streams will look through all the consumer instances under the same application id
|
||||
* and retrieves all hosts info.
|
||||
*
|
||||
* Note that the end-user applications must provide `application.server` as a configuration property
|
||||
* for all the application instances when calling this method. If this is not available, then an empty list will be returned.
|
||||
*
|
||||
* @param store store name
|
||||
* @return the list of {@link HostInfo} where provided store is hosted on
|
||||
*/
|
||||
public List<HostInfo> getAllHostsInfo(String store) {
|
||||
return kafkaStreamsRegistry.getKafkaStreams()
|
||||
.stream()
|
||||
.flatMap(k -> k.allMetadataForStore(store).stream())
|
||||
.filter(Objects::nonNull)
|
||||
.map(StreamsMetadata::hostInfo)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -16,12 +16,16 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
|
||||
@@ -31,7 +35,7 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
*
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
class KafkaStreamsRegistry {
|
||||
public class KafkaStreamsRegistry {
|
||||
|
||||
private Map<KafkaStreams, StreamsBuilderFactoryBean> streamsBuilderFactoryBeanMap = new HashMap<>();
|
||||
|
||||
@@ -60,4 +64,18 @@ class KafkaStreamsRegistry {
|
||||
return this.streamsBuilderFactoryBeanMap.get(kafkaStreams);
|
||||
}
|
||||
|
||||
public StreamsBuilderFactoryBean streamsBuilderFactoryBean(String applicationId) {
|
||||
final Optional<StreamsBuilderFactoryBean> first = this.streamsBuilderFactoryBeanMap.values()
|
||||
.stream()
|
||||
.filter(streamsBuilderFactoryBean -> streamsBuilderFactoryBean
|
||||
.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
|
||||
.equals(applicationId))
|
||||
.findFirst();
|
||||
return first.orElse(null);
|
||||
}
|
||||
|
||||
public List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans() {
|
||||
return new ArrayList<>(this.streamsBuilderFactoryBeanMap.values());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.endpoint;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
|
||||
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
|
||||
import org.springframework.boot.actuate.endpoint.annotation.Selector;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Actuator endpoint for topology description.
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @since 3.0.4
|
||||
*/
|
||||
@Endpoint(id = "topology")
|
||||
public class TopologyEndpoint {
|
||||
|
||||
/**
|
||||
* Topology not found message.
|
||||
*/
|
||||
public static final String NO_TOPOLOGY_FOUND_MSG = "No topology found for the given application ID";
|
||||
|
||||
private final KafkaStreamsRegistry kafkaStreamsRegistry;
|
||||
|
||||
public TopologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
|
||||
}
|
||||
|
||||
@ReadOperation
|
||||
public String topology() {
|
||||
final List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsRegistry.streamsBuilderFactoryBeans();
|
||||
final StringBuilder topologyDescription = new StringBuilder();
|
||||
streamsBuilderFactoryBeans.stream()
|
||||
.forEach(streamsBuilderFactoryBean ->
|
||||
topologyDescription.append(streamsBuilderFactoryBean.getTopology().describe().toString()));
|
||||
return topologyDescription.toString();
|
||||
}
|
||||
|
||||
@ReadOperation
|
||||
public String topology(@Selector String applicationId) {
|
||||
if (!StringUtils.isEmpty(applicationId)) {
|
||||
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamsBuilderFactoryBean(applicationId);
|
||||
if (streamsBuilderFactoryBean != null) {
|
||||
return streamsBuilderFactoryBean.getTopology().describe().toString();
|
||||
}
|
||||
else {
|
||||
return NO_TOPOLOGY_FOUND_MSG;
|
||||
}
|
||||
}
|
||||
return NO_TOPOLOGY_FOUND_MSG;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.endpoint;
|
||||
|
||||
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
|
||||
import org.springframework.boot.actuate.autoconfigure.endpoint.condition.ConditionalOnAvailableEndpoint;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @since 3.0.4
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(name = {
|
||||
"org.springframework.boot.actuate.endpoint.annotation.Endpoint" })
|
||||
@AutoConfigureAfter({EndpointAutoConfiguration.class, KafkaStreamsBinderSupportAutoConfiguration.class})
|
||||
public class TopologyEndpointAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnAvailableEndpoint
|
||||
public TopologyEndpoint topologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
return new TopologyEndpoint(kafkaStreamsRegistry);
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration,\
|
||||
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration
|
||||
|
||||
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration,\
|
||||
org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpointAutoConfiguration
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
@@ -162,6 +163,7 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
InteractiveQueryService interactiveQueryService = context
|
||||
.getBean(InteractiveQueryService.class);
|
||||
HostInfo currentHostInfo = interactiveQueryService.getCurrentHostInfo();
|
||||
|
||||
assertThat(currentHostInfo.host() + ":" + currentHostInfo.port())
|
||||
.isEqualTo(embeddedKafka.getBrokersAsString());
|
||||
|
||||
@@ -173,6 +175,13 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
HostInfo hostInfoFoo = interactiveQueryService
|
||||
.getHostInfo("prod-id-count-store-foo", 123, new IntegerSerializer());
|
||||
assertThat(hostInfoFoo).isNull();
|
||||
|
||||
final List<HostInfo> hostInfos = interactiveQueryService.getAllHostsInfo("prod-id-count-store");
|
||||
assertThat(hostInfos.size()).isEqualTo(1);
|
||||
final HostInfo hostInfo1 = hostInfos.get(0);
|
||||
assertThat(hostInfo1.host() + ":" + hostInfo1.port())
|
||||
.isEqualTo(embeddedKafka.getBrokersAsString());
|
||||
|
||||
}
|
||||
|
||||
@EnableBinding(KafkaStreamsProcessor.class)
|
||||
@@ -214,7 +223,6 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class Product {
|
||||
|
||||
@@ -44,6 +44,8 @@ import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpoint;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
|
||||
@@ -108,6 +110,13 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
assertThat(meterRegistry.get("stream.metrics.commit.total").gauge().value()).isEqualTo(1.0);
|
||||
assertThat(meterRegistry.get("app.info.start.time.ms").gauge().value()).isNotNaN();
|
||||
Assert.isTrue(LATCH.await(5, TimeUnit.SECONDS), "Failed to call customizers");
|
||||
//Testing topology endpoint
|
||||
final KafkaStreamsRegistry kafkaStreamsRegistry = context.getBean(KafkaStreamsRegistry.class);
|
||||
final TopologyEndpoint topologyEndpoint = new TopologyEndpoint(kafkaStreamsRegistry);
|
||||
final String topology1 = topologyEndpoint.topology();
|
||||
final String topology2 = topologyEndpoint.topology("testKstreamWordCountFunction");
|
||||
assertThat(topology1).isNotEmpty();
|
||||
assertThat(topology1).isEqualTo(topology2);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.0.3.BUILD-SNAPSHOT</version>
|
||||
<version>3.0.4.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -103,6 +103,7 @@ import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
||||
import org.springframework.kafka.listener.ConsumerProperties;
|
||||
import org.springframework.kafka.listener.ContainerProperties;
|
||||
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.KafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
@@ -112,6 +113,7 @@ import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
|
||||
import org.springframework.kafka.support.converter.MessagingMessageConverter;
|
||||
import org.springframework.kafka.transaction.KafkaTransactionManager;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
@@ -214,6 +216,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
|
||||
|
||||
private Map<ConsumerDestination, ContainerProperties.AckMode> ackModeInfo = new ConcurrentHashMap<>();
|
||||
|
||||
public KafkaMessageChannelBinder(
|
||||
KafkaBinderConfigurationProperties configurationProperties,
|
||||
KafkaTopicProvisioner provisioningProvider) {
|
||||
@@ -681,6 +685,7 @@ public class KafkaMessageChannelBinder extends
|
||||
kafkaMessageDrivenChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
|
||||
}
|
||||
this.getContainerCustomizer().configure(messageListenerContainer, destination.getName(), group);
|
||||
this.ackModeInfo.put(destination, messageListenerContainer.getContainerProperties().getAckMode());
|
||||
return kafkaMessageDrivenChannelAdapter;
|
||||
}
|
||||
|
||||
@@ -1156,16 +1161,31 @@ public class KafkaMessageChannelBinder extends
|
||||
String dlqName = StringUtils.hasText(kafkaConsumerProperties.getDlqName())
|
||||
? kafkaConsumerProperties.getDlqName()
|
||||
: "error." + record.topic() + "." + group;
|
||||
MessageHeaders headers;
|
||||
if (message instanceof ErrorMessage) {
|
||||
final ErrorMessage errorMessage = (ErrorMessage) message;
|
||||
final Message<?> originalMessage = errorMessage.getOriginalMessage();
|
||||
if (originalMessage != null) {
|
||||
headers = originalMessage.getHeaders();
|
||||
}
|
||||
else {
|
||||
headers = message.getHeaders();
|
||||
}
|
||||
}
|
||||
else {
|
||||
headers = message.getHeaders();
|
||||
}
|
||||
if (this.transactionTemplate != null) {
|
||||
Throwable throwable2 = throwable;
|
||||
this.transactionTemplate.executeWithoutResult(status -> {
|
||||
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable2,
|
||||
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
|
||||
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()),
|
||||
headers, this.ackModeInfo.get(destination));
|
||||
});
|
||||
}
|
||||
else {
|
||||
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable,
|
||||
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
|
||||
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()), headers, this.ackModeInfo.get(destination));
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -1428,7 +1448,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
void sendToDlq(ConsumerRecord<?, ?> consumerRecord, Headers headers,
|
||||
String dlqName, String group, Throwable throwable, DlqPartitionFunction partitionFunction) {
|
||||
String dlqName, String group, Throwable throwable, DlqPartitionFunction partitionFunction,
|
||||
MessageHeaders messageHeaders, ContainerProperties.AckMode ackMode) {
|
||||
K key = (K) consumerRecord.key();
|
||||
V value = (V) consumerRecord.value();
|
||||
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(dlqName,
|
||||
@@ -1458,6 +1479,9 @@ public class KafkaMessageChannelBinder extends
|
||||
KafkaMessageChannelBinder.this.logger
|
||||
.debug("Sent to DLQ " + sb.toString());
|
||||
}
|
||||
if (ackMode == ContainerProperties.AckMode.MANUAL || ackMode == ContainerProperties.AckMode.MANUAL_IMMEDIATE) {
|
||||
messageHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1190,7 +1190,6 @@ public class KafkaBinderTests extends
|
||||
consumerProperties.setBackOffInitialInterval(100);
|
||||
consumerProperties.setBackOffMaxInterval(150);
|
||||
consumerProperties.getExtension().setEnableDlq(true);
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
|
||||
DirectChannel moduleInputChannel = createBindableChannel("input",
|
||||
createConsumerBindingProperties(consumerProperties));
|
||||
@@ -1252,6 +1251,87 @@ public class KafkaBinderTests extends
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
//See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/870 for motivation for this test.
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoCommitOnErrorWhenManualAcknowledgement() throws Exception {
|
||||
Binder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
BindingProperties producerBindingProperties = createProducerBindingProperties(
|
||||
producerProperties);
|
||||
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
producerBindingProperties);
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setMaxAttempts(3);
|
||||
consumerProperties.setBackOffInitialInterval(100);
|
||||
consumerProperties.setBackOffMaxInterval(150);
|
||||
//When auto commit is disabled, then the record is committed after publishing to DLQ using the manual acknowledgement.
|
||||
// (if DLQ is enabled, which is, in this case).
|
||||
consumerProperties.getExtension().setAutoCommitOffset(false);
|
||||
consumerProperties.getExtension().setEnableDlq(true);
|
||||
|
||||
DirectChannel moduleInputChannel = createBindableChannel("input",
|
||||
createConsumerBindingProperties(consumerProperties));
|
||||
|
||||
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
|
||||
moduleInputChannel.subscribe(handler);
|
||||
long uniqueBindingId = System.currentTimeMillis();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(
|
||||
"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
|
||||
producerProperties);
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(
|
||||
"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
|
||||
consumerProperties);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
|
||||
dlqConsumerProperties.setMaxAttempts(1);
|
||||
QueueChannel dlqChannel = new QueueChannel();
|
||||
Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
|
||||
"error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel,
|
||||
dlqConsumerProperties);
|
||||
|
||||
String testMessagePayload = "test." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage = MessageBuilder
|
||||
.withPayload(testMessagePayload.getBytes()).build();
|
||||
moduleOutputChannel.send(testMessage);
|
||||
|
||||
Message<?> dlqMessage = receive(dlqChannel, 3);
|
||||
assertThat(dlqMessage).isNotNull();
|
||||
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
|
||||
// first attempt fails
|
||||
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
|
||||
Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator()
|
||||
.next().getValue();
|
||||
assertThat(handledMessage).isNotNull();
|
||||
assertThat(
|
||||
new String((byte[]) handledMessage.getPayload(), StandardCharsets.UTF_8))
|
||||
.isEqualTo(testMessagePayload);
|
||||
assertThat(handler.getInvocationCount())
|
||||
.isEqualTo(consumerProperties.getMaxAttempts());
|
||||
binderBindUnbindLatency();
|
||||
dlqConsumerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
|
||||
// on the second attempt the message is not redelivered because the DLQ is set and the record in error is already committed.
|
||||
QueueChannel successfulInputChannel = new QueueChannel();
|
||||
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
|
||||
"testGroup", successfulInputChannel, consumerProperties);
|
||||
String testMessage2Payload = "test1." + UUID.randomUUID().toString();
|
||||
Message<byte[]> testMessage2 = MessageBuilder
|
||||
.withPayload(testMessage2Payload.getBytes()).build();
|
||||
moduleOutputChannel.send(testMessage2);
|
||||
|
||||
Message<?> receivedMessage = receive(successfulInputChannel);
|
||||
assertThat(receivedMessage.getPayload())
|
||||
.isEqualTo(testMessage2Payload.getBytes());
|
||||
|
||||
binderBindUnbindLatency();
|
||||
consumerBinding.unbind();
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testConfigurableDlqName() throws Exception {
|
||||
|
||||
Reference in New Issue
Block a user