Compare commits

...

10 Commits

Author SHA1 Message Date
buildmaster
f6aab385ce Update SNAPSHOT to 3.0.4.RELEASE 2020-04-22 16:42:32 +00:00
Soby Chacko
c9a618e1a4 Update Spring-Kafka to 2.3.7.RELEASE 2020-04-22 11:20:12 -04:00
Soby Chacko
e6a82861ba GH:851 Kafka Streams topology visualization
Add Boot actuator endpoints for topology visualization.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/851
2020-03-25 15:14:43 -04:00
Soby Chacko
b329ad133a Offset commit when DLQ is enabled and manual ack (#871)
* Offset commit when DLQ is enabled and manual ack

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/870

When an error occurs, if the application uses manual acknowldegment
(i.e. autoCommitOffset is false) and DLQ is enabled, then after
publishing to DLQ, the offset is not committed currently.
Addressing this issue by manually commiting after publishing to DLQ.

* Address PR review comments

* Addressing PR review comments - #2
2020-03-25 15:13:54 -04:00
Soby Chacko
a5b1d30c1c Update spring kafka version to 2.3.7 snapshot 2020-03-06 16:43:33 -05:00
Serhii Siryi
15e3dd447d InteractiveQueryService API changes
* Add InteractiveQueryService.getAllHostsInfo to fetch metadata
   of all instances that handles specific store.
 * Test to verify this new API (in the existing InteractiveQueryService tests).
2020-03-05 13:26:00 -05:00
Soby Chacko
0551d20d3e Update Kafka Streams docs
Fix missing property prefix in StreamListener based applicationId settings.
2020-03-05 11:41:10 -05:00
buildmaster
9d7b523c16 Bumping versions to 3.0.4.BUILD-SNAPSHOT after release 2020-03-03 16:38:35 +00:00
buildmaster
b8967ff0fa Going back to snapshots 2020-03-03 16:38:35 +00:00
buildmaster
999b2caf35 Update SNAPSHOT to 3.0.3.RELEASE 2020-03-03 16:37:41 +00:00
17 changed files with 332 additions and 32 deletions

View File

@@ -39,7 +39,7 @@ To use Apache Kafka binder, you need to add `spring-cloud-stream-binder-kafka` a
</dependency>
----
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown inn the following example for Maven:
Alternatively, you can also use the Spring Cloud Stream Kafka Starter, as shown in the following example for Maven:
[source,xml]
----
@@ -60,7 +60,7 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka
The consumer group maps directly to the same Apache Kafka concept.
Partitioning also maps directly to Apache Kafka partitions as well.
The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker of at least that version.
The binder currently uses the Apache Kafka `kafka-clients` version `2.3.1`.
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
For example, with versions earlier than 0.11.x.x, native headers are not supported.
Also, 0.11.x.x does not support the `autoAddPartitions` property.
@@ -155,21 +155,15 @@ Default: See individual producer properties.
spring.cloud.stream.kafka.binder.headerMapperBeanName::
The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to and from Kafka headers.
Use this, for example, if you wish to customize the trusted packages in a `DefaultKafkaHeaderMapper` that uses JSON deserialization for the headers.
Use this, for example, if you wish to customize the trusted packages in a `BinderHeaderMapper` bean that uses JSON deserialization for the headers.
If this custom `BinderHeaderMapper` bean is not made available to the binder using this property, then the binder will look for a header mapper bean with the name `kafkaBinderHeaderMapper` that is of type `BinderHeaderMapper` before falling back to a default `BinderHeaderMapper` created by the binder.
+
Default: none.
spring.cloud.stream.kafka.binder.authorizationExceptionRetryInterval::
Enables retrying in case of authorization exceptions.
Defines interval between each retry.
Accepts `Duration`, e.g. `30s`, `2m`, etc.
+
Default: `null` (retries disabled, fail fast)
[[kafka-consumer-properties]]
==== Kafka Consumer Properties
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.consumer.<property>=<value>`.
The following properties are available for Kafka consumers only and
@@ -234,9 +228,20 @@ The DLQ topic name can be configurable by setting the `dlqName` property.
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
See <<kafka-dlq-processing>> processing for more information.
Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message`, and `x-exception-stacktrace` as `byte[]`.
By default, a failed record is sent to the same partition number in the DLQ topic as the original record.
See <<dlq-partition-selection>> for how to change that behavior.
**Not allowed when `destinationIsPattern` is `true`.**
+
Default: `false`.
dlqPartitions::
When `enableDlq` is true, and this property is not set, a dead letter topic with the same number of partitions as the primary topic(s) is created.
Usually, dead-letter records are sent to the same partition in the dead-letter topic as the original record.
This behavior can be changed; see <<dlq-partition-selection>>.
If this property is set to `1` and there is no `DqlPartitionFunction` bean, all dead-letter records will be written to partition `0`.
If this property is greater than `1`, you **MUST** provide a `DlqPartitionFunction` bean.
Note that the actual partition count is affected by the binder's `minPartitionCount` property.
+
Default: `none`
configuration::
Map with a key/value pair containing generic Kafka consumer properties.
In addition to having Kafka consumer properties, other configuration properties can be passed here.
@@ -311,7 +316,7 @@ Refer to the https://docs.spring.io/spring-kafka/docs/2.3.0.BUILD-SNAPSHOT/refer
[[kafka-producer-properties]]
==== Kafka Producer Properties
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.kafka.default.producer.<property>=<value>`.
The following properties are available for Kafka producers only and
@@ -576,6 +581,10 @@ When used in a processor application, the consumer starts the transaction; any r
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
A common producer factory is used for all producer bindings configured using `spring.cloud.stream.kafka.binder.transaction.producer.*` properties; individual binding Kafka producer properties are ignored.
IMPORTANT: Normal binder retries (and dead lettering) are not supported with transactions because the retries will run in the original transaction, which may be rolled back and any published records will be rolled back too.
When retries are enabled (the common property `maxAttempts` is greater than zero) the retry properties are used to configure a `DefaultAfterRollbackProcessor` to enable retries at the container level.
Similarly, instead of publishing dead-letter records within the transaction, this functionality is moved to the listener container, again via the `DefaultAfterRollbackProcessor` which runs after the main transaction has rolled back.
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. `@Scheduled` method), you must get a reference to the transactional producer factory and define a `KafkaTransactionManager` bean using it.
====

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.0.3.BUILD-SNAPSHOT</version>
<version>3.0.4.RELEASE</version>
</parent>
<packaging>pom</packaging>
<name>spring-cloud-stream-binder-kafka-docs</name>

View File

@@ -589,11 +589,11 @@ public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Ob
Then you must set the application id for this using the following binding property.
`spring.cloud.stream.kafka.streams.bindings.input.applicationId`
`spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId`
and
`spring.cloud.stream.kafka.streams.bindings.anotherInput.applicationId`
`spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId`
For function based model also, this approach of setting application id at the binding level will work.
@@ -1448,6 +1448,19 @@ By default, the `Kafkastreams.cleanup()` method is called when the binding is st
See https://docs.spring.io/spring-kafka/reference/html/_reference.html#_configuration[the Spring Kafka documentation].
To modify this behavior simply add a single `CleanupConfig` `@Bean` (configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.
=== Kafka Streams topology visualization
Kafka Streams binder provides the following actuator endpoints for retrieving the topology description using which you can visualize the topology using external tools.
`/actuator/topology`
`/actuator/topology/<applicaiton-id of the processor>`
You need to include the actuator and web dependencies from Spring Boot to access these endpoints.
Further, you also need to add `topology` to `management.endpoints.web.exposure.include` property.
By default, the `topology` endpoint is disabled.
=== Configuration Options
This section contains the configuration options used by the Kafka Streams binder.

10
pom.xml
View File

@@ -2,21 +2,21 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.0.3.BUILD-SNAPSHOT</version>
<version>3.0.4.RELEASE</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.2.2.RELEASE</version>
<version>2.2.3.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.3.5.RELEASE</spring-kafka.version>
<spring-kafka.version>2.3.7.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.2.1.RELEASE</spring-integration-kafka.version>
<kafka.version>2.3.1</kafka.version>
<spring-cloud-schema-registry.version>1.0.3.BUILD-SNAPSHOT</spring-cloud-schema-registry.version>
<spring-cloud-stream.version>3.0.3.BUILD-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-schema-registry.version>1.0.4.RELEASE</spring-cloud-schema-registry.version>
<spring-cloud-stream.version>3.0.4.RELEASE</spring-cloud-stream.version>
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.0.3.BUILD-SNAPSHOT</version>
<version>3.0.4.RELEASE</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.0.3.BUILD-SNAPSHOT</version>
<version>3.0.4.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.0.3.BUILD-SNAPSHOT</version>
<version>3.0.4.RELEASE</version>
</parent>
<properties>

View File

@@ -17,9 +17,12 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +48,7 @@ import org.springframework.util.StringUtils;
*
* @author Soby Chacko
* @author Renwei Han
* @author Serhii Siryi
* @since 2.1.0
*/
public class InteractiveQueryService {
@@ -153,4 +157,25 @@ public class InteractiveQueryService {
return streamsMetadata != null ? streamsMetadata.hostInfo() : null;
}
/**
* Gets the list of {@link HostInfo} where the provided store is hosted on.
* It also can include current host info.
* Kafka Streams will look through all the consumer instances under the same application id
* and retrieves all hosts info.
*
* Note that the end-user applications must provide `application.server` as a configuration property
* for all the application instances when calling this method. If this is not available, then an empty list will be returned.
*
* @param store store name
* @return the list of {@link HostInfo} where provided store is hosted on
*/
public List<HostInfo> getAllHostsInfo(String store) {
return kafkaStreamsRegistry.getKafkaStreams()
.stream()
.flatMap(k -> k.allMetadataForStore(store).stream())
.filter(Objects::nonNull)
.map(StreamsMetadata::hostInfo)
.collect(Collectors.toList());
}
}

View File

@@ -16,12 +16,16 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
@@ -31,7 +35,7 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean;
*
* @author Soby Chacko
*/
class KafkaStreamsRegistry {
public class KafkaStreamsRegistry {
private Map<KafkaStreams, StreamsBuilderFactoryBean> streamsBuilderFactoryBeanMap = new HashMap<>();
@@ -60,4 +64,18 @@ class KafkaStreamsRegistry {
return this.streamsBuilderFactoryBeanMap.get(kafkaStreams);
}
public StreamsBuilderFactoryBean streamsBuilderFactoryBean(String applicationId) {
final Optional<StreamsBuilderFactoryBean> first = this.streamsBuilderFactoryBeanMap.values()
.stream()
.filter(streamsBuilderFactoryBean -> streamsBuilderFactoryBean
.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals(applicationId))
.findFirst();
return first.orElse(null);
}
public List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans() {
return new ArrayList<>(this.streamsBuilderFactoryBeanMap.values());
}
}

View File

@@ -0,0 +1,71 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.endpoint;
import java.util.List;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.boot.actuate.endpoint.annotation.Selector;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.util.StringUtils;
/**
* Actuator endpoint for topology description.
*
* @author Soby Chacko
* @since 3.0.4
*/
@Endpoint(id = "topology")
public class TopologyEndpoint {
/**
* Topology not found message.
*/
public static final String NO_TOPOLOGY_FOUND_MSG = "No topology found for the given application ID";
private final KafkaStreamsRegistry kafkaStreamsRegistry;
public TopologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
}
@ReadOperation
public String topology() {
final List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsRegistry.streamsBuilderFactoryBeans();
final StringBuilder topologyDescription = new StringBuilder();
streamsBuilderFactoryBeans.stream()
.forEach(streamsBuilderFactoryBean ->
topologyDescription.append(streamsBuilderFactoryBean.getTopology().describe().toString()));
return topologyDescription.toString();
}
@ReadOperation
public String topology(@Selector String applicationId) {
if (!StringUtils.isEmpty(applicationId)) {
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamsBuilderFactoryBean(applicationId);
if (streamsBuilderFactoryBean != null) {
return streamsBuilderFactoryBean.getTopology().describe().toString();
}
else {
return NO_TOPOLOGY_FOUND_MSG;
}
}
return NO_TOPOLOGY_FOUND_MSG;
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.endpoint;
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
import org.springframework.boot.actuate.autoconfigure.endpoint.condition.ConditionalOnAvailableEndpoint;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Soby Chacko
* @since 3.0.4
*/
@Configuration
@ConditionalOnClass(name = {
"org.springframework.boot.actuate.endpoint.annotation.Endpoint" })
@AutoConfigureAfter({EndpointAutoConfiguration.class, KafkaStreamsBinderSupportAutoConfiguration.class})
public class TopologyEndpointAutoConfiguration {
@Bean
@ConditionalOnAvailableEndpoint
public TopologyEndpoint topologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
return new TopologyEndpoint(kafkaStreamsRegistry);
}
}

View File

@@ -1,4 +1,4 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpointAutoConfiguration

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
@@ -162,6 +163,7 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
InteractiveQueryService interactiveQueryService = context
.getBean(InteractiveQueryService.class);
HostInfo currentHostInfo = interactiveQueryService.getCurrentHostInfo();
assertThat(currentHostInfo.host() + ":" + currentHostInfo.port())
.isEqualTo(embeddedKafka.getBrokersAsString());
@@ -173,6 +175,13 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
HostInfo hostInfoFoo = interactiveQueryService
.getHostInfo("prod-id-count-store-foo", 123, new IntegerSerializer());
assertThat(hostInfoFoo).isNull();
final List<HostInfo> hostInfos = interactiveQueryService.getAllHostsInfo("prod-id-count-store");
assertThat(hostInfos.size()).isEqualTo(1);
final HostInfo hostInfo1 = hostInfos.get(0);
assertThat(hostInfo1.host() + ":" + hostInfo1.port())
.isEqualTo(embeddedKafka.getBrokersAsString());
}
@EnableBinding(KafkaStreamsProcessor.class)
@@ -214,7 +223,6 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
}
}
}
static class Product {

View File

@@ -44,6 +44,8 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpoint;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
@@ -108,6 +110,13 @@ public class KafkaStreamsBinderWordCountFunctionTests {
assertThat(meterRegistry.get("stream.metrics.commit.total").gauge().value()).isEqualTo(1.0);
assertThat(meterRegistry.get("app.info.start.time.ms").gauge().value()).isNotNaN();
Assert.isTrue(LATCH.await(5, TimeUnit.SECONDS), "Failed to call customizers");
//Testing topology endpoint
final KafkaStreamsRegistry kafkaStreamsRegistry = context.getBean(KafkaStreamsRegistry.class);
final TopologyEndpoint topologyEndpoint = new TopologyEndpoint(kafkaStreamsRegistry);
final String topology1 = topologyEndpoint.topology();
final String topology2 = topologyEndpoint.topology("testKstreamWordCountFunction");
assertThat(topology1).isNotEmpty();
assertThat(topology1).isEqualTo(topology2);
}
}

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.0.3.BUILD-SNAPSHOT</version>
<version>3.0.4.RELEASE</version>
</parent>
<dependencies>

View File

@@ -103,6 +103,7 @@ import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ConsumerProperties;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.ProducerListener;
@@ -112,6 +113,7 @@ import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
@@ -214,6 +216,8 @@ public class KafkaMessageChannelBinder extends
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
private Map<ConsumerDestination, ContainerProperties.AckMode> ackModeInfo = new ConcurrentHashMap<>();
public KafkaMessageChannelBinder(
KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
@@ -681,6 +685,7 @@ public class KafkaMessageChannelBinder extends
kafkaMessageDrivenChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());
}
this.getContainerCustomizer().configure(messageListenerContainer, destination.getName(), group);
this.ackModeInfo.put(destination, messageListenerContainer.getContainerProperties().getAckMode());
return kafkaMessageDrivenChannelAdapter;
}
@@ -1156,16 +1161,31 @@ public class KafkaMessageChannelBinder extends
String dlqName = StringUtils.hasText(kafkaConsumerProperties.getDlqName())
? kafkaConsumerProperties.getDlqName()
: "error." + record.topic() + "." + group;
MessageHeaders headers;
if (message instanceof ErrorMessage) {
final ErrorMessage errorMessage = (ErrorMessage) message;
final Message<?> originalMessage = errorMessage.getOriginalMessage();
if (originalMessage != null) {
headers = originalMessage.getHeaders();
}
else {
headers = message.getHeaders();
}
}
else {
headers = message.getHeaders();
}
if (this.transactionTemplate != null) {
Throwable throwable2 = throwable;
this.transactionTemplate.executeWithoutResult(status -> {
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable2,
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()),
headers, this.ackModeInfo.get(destination));
});
}
else {
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName, group, throwable,
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()));
determinDlqPartitionFunction(properties.getExtension().getDlqPartitions()), headers, this.ackModeInfo.get(destination));
}
};
}
@@ -1428,7 +1448,8 @@ public class KafkaMessageChannelBinder extends
@SuppressWarnings("unchecked")
void sendToDlq(ConsumerRecord<?, ?> consumerRecord, Headers headers,
String dlqName, String group, Throwable throwable, DlqPartitionFunction partitionFunction) {
String dlqName, String group, Throwable throwable, DlqPartitionFunction partitionFunction,
MessageHeaders messageHeaders, ContainerProperties.AckMode ackMode) {
K key = (K) consumerRecord.key();
V value = (V) consumerRecord.value();
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(dlqName,
@@ -1458,6 +1479,9 @@ public class KafkaMessageChannelBinder extends
KafkaMessageChannelBinder.this.logger
.debug("Sent to DLQ " + sb.toString());
}
if (ackMode == ContainerProperties.AckMode.MANUAL || ackMode == ContainerProperties.AckMode.MANUAL_IMMEDIATE) {
messageHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge();
}
}
});
}

View File

@@ -1190,7 +1190,6 @@ public class KafkaBinderTests extends
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
consumerProperties.getExtension().setEnableDlq(true);
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
DirectChannel moduleInputChannel = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
@@ -1252,6 +1251,87 @@ public class KafkaBinderTests extends
producerBinding.unbind();
}
//See https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/870 for motivation for this test.
@Test
@SuppressWarnings("unchecked")
public void testAutoCommitOnErrorWhenManualAcknowledgement() throws Exception {
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
BindingProperties producerBindingProperties = createProducerBindingProperties(
producerProperties);
DirectChannel moduleOutputChannel = createBindableChannel("output",
producerBindingProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setMaxAttempts(3);
consumerProperties.setBackOffInitialInterval(100);
consumerProperties.setBackOffMaxInterval(150);
//When auto commit is disabled, then the record is committed after publishing to DLQ using the manual acknowledgement.
// (if DLQ is enabled, which is, in this case).
consumerProperties.getExtension().setAutoCommitOffset(false);
consumerProperties.getExtension().setEnableDlq(true);
DirectChannel moduleInputChannel = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
moduleInputChannel.subscribe(handler);
long uniqueBindingId = System.currentTimeMillis();
Binding<MessageChannel> producerBinding = binder.bindProducer(
"retryTest." + uniqueBindingId + ".0", moduleOutputChannel,
producerProperties);
Binding<MessageChannel> consumerBinding = binder.bindConsumer(
"retryTest." + uniqueBindingId + ".0", "testGroup", moduleInputChannel,
consumerProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
dlqConsumerProperties.setMaxAttempts(1);
QueueChannel dlqChannel = new QueueChannel();
Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
"error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel,
dlqConsumerProperties);
String testMessagePayload = "test." + UUID.randomUUID().toString();
Message<byte[]> testMessage = MessageBuilder
.withPayload(testMessagePayload.getBytes()).build();
moduleOutputChannel.send(testMessage);
Message<?> dlqMessage = receive(dlqChannel, 3);
assertThat(dlqMessage).isNotNull();
assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
// first attempt fails
assertThat(handler.getReceivedMessages().entrySet()).hasSize(1);
Message<?> handledMessage = handler.getReceivedMessages().entrySet().iterator()
.next().getValue();
assertThat(handledMessage).isNotNull();
assertThat(
new String((byte[]) handledMessage.getPayload(), StandardCharsets.UTF_8))
.isEqualTo(testMessagePayload);
assertThat(handler.getInvocationCount())
.isEqualTo(consumerProperties.getMaxAttempts());
binderBindUnbindLatency();
dlqConsumerBinding.unbind();
consumerBinding.unbind();
// on the second attempt the message is not redelivered because the DLQ is set and the record in error is already committed.
QueueChannel successfulInputChannel = new QueueChannel();
consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0",
"testGroup", successfulInputChannel, consumerProperties);
String testMessage2Payload = "test1." + UUID.randomUUID().toString();
Message<byte[]> testMessage2 = MessageBuilder
.withPayload(testMessage2Payload.getBytes()).build();
moduleOutputChannel.send(testMessage2);
Message<?> receivedMessage = receive(successfulInputChannel);
assertThat(receivedMessage.getPayload())
.isEqualTo(testMessage2Payload.getBytes());
binderBindUnbindLatency();
consumerBinding.unbind();
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testConfigurableDlqName() throws Exception {