Custom DLQ Destination Resolver (#976)
* Custom DLQ Destination Resolver Allow applications to provide a custom DLQ destination resolver implementaiton by providing a new interface DlqDestinationResolver as part of binder's public contract. This interface is a BiFunction extension using which the applications can provide more fine grained control over where to route records in error. Adding test to verify. Adding docs. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/966 * Add DlqDestinationResolver to MessageChannel based binder. Tests and docs
This commit is contained in:
@@ -21,10 +21,36 @@ public DlqPartitionFunction partitionFunction() {
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
NOTE: If you set a consumer binding's `dlqPartitions` property to 1 (and the binder's `minPartitionCount` is equal to `1`), there is no need to supply a `DlqPartitionFunction`; the framework will always use partition 0.
|
||||
If you set a consumer binding's `dlqPartitions` property to a value greater than `1` (or the binder's `minPartitionCount` is greater than `1`), you **must** provide a `DlqPartitionFunction` bean, even if the partition count is the same as the original topic's.
|
||||
|
||||
It is also possible to define a custom name for the DLQ topic.
|
||||
In order to do so, create an implementation of `DlqDestinationResolver` as a `@Bean` to the application context.
|
||||
When the binder detects such a bean, that takes precedence, otherwise it will use the `dlqName` property.
|
||||
If neither of these are found, it will default to `error.<destination>.<group>`.
|
||||
Here is an example of `DlqDestinationResolver` as a `@Bean`.
|
||||
|
||||
====
|
||||
[source]
|
||||
----
|
||||
@Bean
|
||||
public DlqDestinationResolver dlqDestinationResolver() {
|
||||
return (rec, ex) -> {
|
||||
if (rec.topic().equals("word1")) {
|
||||
return "topic1-dlq";
|
||||
}
|
||||
else {
|
||||
return "topic2-dlq";
|
||||
}
|
||||
};
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
One important thing to keep in mind when providing an implementation for `DlqDestinationResolver` is that the provisioner in the binder will not auto create topics for the application.
|
||||
This is because there is no way for the binder to infer the names of all the DLQ topics the implementation might send to.
|
||||
Therefore, if you provide DLQ names using this strategy, it is the application's responsibility to ensure that those topics are created beforehand.
|
||||
|
||||
[[dlq-handling]]
|
||||
==== Handling Records in a Dead-Letter Topic
|
||||
|
||||
|
||||
@@ -832,13 +832,41 @@ When the above property is set, all the records in deserialization error are aut
|
||||
|
||||
You can set the topic name where the DLQ messages are published as below.
|
||||
|
||||
You can provide an implementation for `DlqDestinationResolver` which is a functional interface.
|
||||
`DlqDestinationResolver` takes `ConsumerRecord` and the exception as inputs and then allows to specify a topic name as the output.
|
||||
By gaining access to the Kafka `ConsumerRecord`, the header records can be introspected in the implementation of the `BiFunction`.
|
||||
|
||||
Here is an example of providing an implementation for `DlqDestinationResolver`.
|
||||
|
||||
[source]
|
||||
----
|
||||
@Bean
|
||||
public DlqDestinationResolver dlqDestinationResolver() {
|
||||
return (rec, ex) -> {
|
||||
if (rec.topic().equals("word1")) {
|
||||
return "topic1-dlq";
|
||||
}
|
||||
else {
|
||||
return "topic2-dlq";
|
||||
}
|
||||
};
|
||||
}
|
||||
----
|
||||
|
||||
One important thing to keep in mind when providing an implementation for `DlqDestinationResolver` is that the provisioner in the binder will not auto create topics for the application.
|
||||
This is because there is no way for the binder to infer the names of all the DLQ topics the implementation might send to.
|
||||
Therefore, if you provide DLQ names using this strategy, it is the application's responsibility to ensure that those topics are created beforehand.
|
||||
|
||||
If `DlqDestinationResolver` is present in the application as a bean, that takes higher prcedence.
|
||||
If you do not want to follow this approach and rather provide a static DLQ name using configuration, you can set the following property.
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
|
||||
----
|
||||
|
||||
If this is set, then the error records are sent to the topic `custom-dlq`.
|
||||
If this is not set, then it will create a DLQ topic with the name `error.<input-topic-name>.<application-id>`.
|
||||
If the application is not using either of the above strategies, then it will create a DLQ topic with the name `error.<input-topic-name>.<application-id>`.
|
||||
For instance, if your binding's destination topic is `inputTopic` and the application ID is `process-applicationId`, then the default DLQ topic is `error.inputTopic.process-applicationId`.
|
||||
It is always recommended to explicitly create a DLQ topic for each input binding if it is your intention to enable DLQ.
|
||||
|
||||
|
||||
@@ -213,7 +213,7 @@ Default: null (equivalent to `earliest`).
|
||||
enableDlq::
|
||||
When set to true, it enables DLQ behavior for the consumer.
|
||||
By default, messages that result in errors are forwarded to a topic named `error.<destination>.<group>`.
|
||||
The DLQ topic name can be configurable by setting the `dlqName` property.
|
||||
The DLQ topic name can be configurable by setting the `dlqName` property or by defining a `@Bean` of type `DlqDestinationResolver`.
|
||||
This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
|
||||
See <<kafka-dlq-processing>> processing for more information.
|
||||
Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: `x-original-topic`, `x-exception-message`, and `x-exception-stacktrace` as `byte[]`.
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.utils;
|
||||
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
||||
/**
|
||||
* A {@link BiFunction} extension for defining DLQ destination resolvers.
|
||||
*
|
||||
* The BiFunction takes the {@link ConsumerRecord} and the exception as inputs
|
||||
* and returns a topic name as the DLQ.
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @since 3.0.9
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface DlqDestinationResolver extends BiFunction<ConsumerRecord<?, ?>, Exception, String> {
|
||||
|
||||
}
|
||||
@@ -36,10 +36,12 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerPro
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaOperations;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
|
||||
@@ -99,24 +101,29 @@ final class KafkaStreamsBinderUtils {
|
||||
new ExtendedProducerProperties<>(
|
||||
extendedConsumerProperties.getExtension().getDlqProducerProperties()),
|
||||
binderConfigurationProperties);
|
||||
KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
|
||||
KafkaOperations<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
|
||||
|
||||
Map<String, DlqDestinationResolver> dlqDestinationResolvers =
|
||||
context.getBeansOfType(DlqDestinationResolver.class, false, false);
|
||||
|
||||
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver =
|
||||
(cr, e) -> new TopicPartition(extendedConsumerProperties.getExtension().getDlqName(),
|
||||
partitionFunction.apply(group, cr, e));
|
||||
DeadLetterPublishingRecoverer kafkaStreamsBinderDlqRecoverer = !StringUtils
|
||||
dlqDestinationResolvers.isEmpty() ? (cr, e) -> new TopicPartition(extendedConsumerProperties.getExtension().getDlqName(),
|
||||
partitionFunction.apply(group, cr, e)) :
|
||||
(cr, e) -> new TopicPartition(dlqDestinationResolvers.values().iterator().next().apply(cr, e),
|
||||
partitionFunction.apply(group, cr, e));
|
||||
|
||||
DeadLetterPublishingRecoverer kafkaStreamsBinderDlqRecoverer = !dlqDestinationResolvers.isEmpty() || !StringUtils
|
||||
.isEmpty(extendedConsumerProperties.getExtension().getDlqName())
|
||||
? new DeadLetterPublishingRecoverer(kafkaTemplate, destinationResolver)
|
||||
: null;
|
||||
for (String inputTopic : inputTopics) {
|
||||
if (StringUtils.isEmpty(
|
||||
extendedConsumerProperties.getExtension().getDlqName())) {
|
||||
extendedConsumerProperties.getExtension().getDlqName()) && dlqDestinationResolvers.isEmpty()) {
|
||||
destinationResolver = (cr, e) -> new TopicPartition("error." + inputTopic + "." + group,
|
||||
partitionFunction.apply(group, cr, e));
|
||||
kafkaStreamsBinderDlqRecoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
|
||||
destinationResolver);
|
||||
}
|
||||
|
||||
SendToDlqAndContinue sendToDlqAndContinue = context
|
||||
.getBean(SendToDlqAndContinue.class);
|
||||
sendToDlqAndContinue.addKStreamDlqDispatch(inputTopic,
|
||||
|
||||
@@ -0,0 +1,145 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.Grouped;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class DlqDestinationResolverTests {
|
||||
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
|
||||
"topic1-dlq",
|
||||
"topic2-dlq");
|
||||
|
||||
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
|
||||
.getEmbeddedKafka();
|
||||
|
||||
@Test
|
||||
public void testDlqDestinationResolverWorks() throws Exception {
|
||||
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
|
||||
try (ConfigurableApplicationContext ignored = app.run(
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.function.definition=process",
|
||||
"--spring.cloud.stream.bindings.process-in-0.destination=word1,word2",
|
||||
"--spring.cloud.stream.bindings.process-out-0.destination=test-output",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.application-id=dlq-dest-resolver-test",
|
||||
"--spring.cloud.stream.kafka.streams.binder.serdeError=sendToDlq",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde="
|
||||
+ "org.apache.kafka.common.serialization.Serdes$IntegerSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
|
||||
senderProps);
|
||||
try {
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
|
||||
template.setDefaultTopic("word1");
|
||||
template.sendDefault("foobar");
|
||||
|
||||
template.setDefaultTopic("word2");
|
||||
template.sendDefault("foobar");
|
||||
|
||||
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("some-random-group",
|
||||
"false", embeddedKafka);
|
||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
|
||||
consumerProps);
|
||||
Consumer<String, String> consumer1 = cf.createConsumer();
|
||||
embeddedKafka.consumeFromEmbeddedTopics(consumer1, "topic1-dlq",
|
||||
"topic2-dlq");
|
||||
|
||||
ConsumerRecord<String, String> cr1 = KafkaTestUtils.getSingleRecord(consumer1,
|
||||
"topic1-dlq");
|
||||
assertThat(cr1.value()).isEqualTo("foobar");
|
||||
ConsumerRecord<String, String> cr2 = KafkaTestUtils.getSingleRecord(consumer1,
|
||||
"topic2-dlq");
|
||||
assertThat(cr2.value()).isEqualTo("foobar");
|
||||
}
|
||||
finally {
|
||||
pf.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
public static class WordCountProcessorApplication {
|
||||
|
||||
@Bean
|
||||
public Function<KStream<Object, String>, KStream<?, String>> process() {
|
||||
|
||||
return input -> input
|
||||
.flatMapValues(
|
||||
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
|
||||
.map((key, value) -> new KeyValue<>(value, value))
|
||||
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
|
||||
.windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts-x"))
|
||||
.toStream().map((key, value) -> new KeyValue<>(null,
|
||||
"Count for " + key.key() + " : " + value));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DlqPartitionFunction partitionFunction() {
|
||||
return (group, rec, ex) -> 0;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public DlqDestinationResolver dlqDestinationResolver() {
|
||||
return (rec, ex) -> {
|
||||
if (rec.topic().equals("word1")) {
|
||||
return "topic1-dlq";
|
||||
}
|
||||
return "topic2-dlq";
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -71,6 +71,7 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerPro
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
|
||||
import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor;
|
||||
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
|
||||
@@ -214,6 +215,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
private final DlqPartitionFunction dlqPartitionFunction;
|
||||
|
||||
private final DlqDestinationResolver dlqDestinationResolver;
|
||||
|
||||
private final Map<ConsumerDestination, ContainerProperties.AckMode> ackModeInfo = new ConcurrentHashMap<>();
|
||||
|
||||
private ProducerListener<byte[], byte[]> producerListener;
|
||||
@@ -226,7 +229,8 @@ public class KafkaMessageChannelBinder extends
|
||||
KafkaBinderConfigurationProperties configurationProperties,
|
||||
KafkaTopicProvisioner provisioningProvider) {
|
||||
|
||||
this(configurationProperties, provisioningProvider, null, null, null, null);
|
||||
this(configurationProperties, provisioningProvider, null, null, null,
|
||||
null, null);
|
||||
}
|
||||
|
||||
public KafkaMessageChannelBinder(
|
||||
@@ -235,7 +239,8 @@ public class KafkaMessageChannelBinder extends
|
||||
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> containerCustomizer,
|
||||
KafkaBindingRebalanceListener rebalanceListener) {
|
||||
|
||||
this(configurationProperties, provisioningProvider, containerCustomizer, null, rebalanceListener, null);
|
||||
this(configurationProperties, provisioningProvider, containerCustomizer, null, rebalanceListener,
|
||||
null, null);
|
||||
}
|
||||
|
||||
public KafkaMessageChannelBinder(
|
||||
@@ -244,7 +249,8 @@ public class KafkaMessageChannelBinder extends
|
||||
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> containerCustomizer,
|
||||
MessageSourceCustomizer<KafkaMessageSource<?, ?>> sourceCustomizer,
|
||||
KafkaBindingRebalanceListener rebalanceListener,
|
||||
DlqPartitionFunction dlqPartitionFunction) {
|
||||
DlqPartitionFunction dlqPartitionFunction,
|
||||
DlqDestinationResolver dlqDestinationResolver) {
|
||||
|
||||
super(headersToMap(configurationProperties), provisioningProvider,
|
||||
containerCustomizer, sourceCustomizer);
|
||||
@@ -261,9 +267,8 @@ public class KafkaMessageChannelBinder extends
|
||||
this.transactionTemplate = null;
|
||||
}
|
||||
this.rebalanceListener = rebalanceListener;
|
||||
this.dlqPartitionFunction = dlqPartitionFunction != null
|
||||
? dlqPartitionFunction
|
||||
: null;
|
||||
this.dlqPartitionFunction = dlqPartitionFunction;
|
||||
this.dlqDestinationResolver = dlqDestinationResolver;
|
||||
}
|
||||
|
||||
private static String[] headersToMap(
|
||||
@@ -1177,9 +1182,7 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
}
|
||||
}
|
||||
String dlqName = StringUtils.hasText(kafkaConsumerProperties.getDlqName())
|
||||
? kafkaConsumerProperties.getDlqName()
|
||||
: "error." + record.topic() + "." + group;
|
||||
|
||||
MessageHeaders headers;
|
||||
if (message instanceof ErrorMessage) {
|
||||
final ErrorMessage errorMessage = (ErrorMessage) message;
|
||||
@@ -1194,6 +1197,10 @@ public class KafkaMessageChannelBinder extends
|
||||
else {
|
||||
headers = message.getHeaders();
|
||||
}
|
||||
String dlqName = this.dlqDestinationResolver != null ?
|
||||
this.dlqDestinationResolver.apply(recordToSend.get(), new Exception(throwable)) : StringUtils.hasText(kafkaConsumerProperties.getDlqName())
|
||||
? kafkaConsumerProperties.getDlqName()
|
||||
: "error." + record.topic() + "." + group;
|
||||
if (this.transactionTemplate != null) {
|
||||
Throwable throwable2 = throwable;
|
||||
this.transactionTemplate.executeWithoutResult(status -> {
|
||||
|
||||
@@ -39,6 +39,7 @@ import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleC
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
|
||||
import org.springframework.cloud.stream.config.ConsumerEndpointCustomizer;
|
||||
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
|
||||
@@ -118,12 +119,12 @@ public class KafkaBinderConfiguration {
|
||||
@Nullable ConsumerEndpointCustomizer<KafkaMessageDrivenChannelAdapter<?, ?>> consumerCustomizer,
|
||||
ObjectProvider<KafkaBindingRebalanceListener> rebalanceListener,
|
||||
ObjectProvider<DlqPartitionFunction> dlqPartitionFunction,
|
||||
ObjectProvider<DlqDestinationResolver> dlqDestinationResolver,
|
||||
ObjectProvider<ClientFactoryCustomizer> clientFactoryCustomizer) {
|
||||
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
|
||||
configurationProperties, provisioningProvider,
|
||||
listenerContainerCustomizer, sourceCustomizer, rebalanceListener.getIfUnique(),
|
||||
dlqPartitionFunction.getIfUnique());
|
||||
dlqPartitionFunction.getIfUnique(), dlqDestinationResolver.getIfUnique());
|
||||
kafkaMessageChannelBinder.setProducerListener(this.producerListener);
|
||||
kafkaMessageChannelBinder
|
||||
.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
|
||||
|
||||
@@ -89,6 +89,7 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfi
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
|
||||
import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor;
|
||||
@@ -218,12 +219,12 @@ public class KafkaBinderTests extends
|
||||
private KafkaTestBinder getBinder(
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) {
|
||||
|
||||
return getBinder(kafkaBinderConfigurationProperties, null);
|
||||
return getBinder(kafkaBinderConfigurationProperties, null, null);
|
||||
}
|
||||
|
||||
private KafkaTestBinder getBinder(
|
||||
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
|
||||
DlqPartitionFunction dlqPartitionFunction) {
|
||||
DlqPartitionFunction dlqPartitionFunction, DlqDestinationResolver dlqDestinationResolver) {
|
||||
|
||||
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
|
||||
kafkaBinderConfigurationProperties, new TestKafkaProperties());
|
||||
@@ -234,7 +235,7 @@ public class KafkaBinderTests extends
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return new KafkaTestBinder(kafkaBinderConfigurationProperties,
|
||||
provisioningProvider, dlqPartitionFunction);
|
||||
provisioningProvider, dlqPartitionFunction, dlqDestinationResolver);
|
||||
}
|
||||
|
||||
private KafkaBinderConfigurationProperties createConfigurationProperties() {
|
||||
@@ -866,40 +867,41 @@ public class KafkaBinderTests extends
|
||||
|
||||
@Test
|
||||
public void testDlqAndRetry() throws Exception {
|
||||
testDlqGuts(true, null, null);
|
||||
testDlqGuts(true, null, null, false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDlqAndRetryTransactional() throws Exception {
|
||||
testDlqGuts(true, null, null, true);
|
||||
testDlqGuts(true, null, null, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDlq() throws Exception {
|
||||
testDlqGuts(false, null, 3);
|
||||
testDlqGuts(false, null, 3, false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDlqWithDlqDestinationResolver() throws Exception {
|
||||
testDlqGuts(false, null, 3, false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDlqTransactional() throws Exception {
|
||||
testDlqGuts(false, null, 3, true);
|
||||
testDlqGuts(false, null, 3, true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDlqNone() throws Exception {
|
||||
testDlqGuts(false, HeaderMode.none, 1);
|
||||
testDlqGuts(false, HeaderMode.none, 1, false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDlqEmbedded() throws Exception {
|
||||
testDlqGuts(false, HeaderMode.embeddedHeaders, 3);
|
||||
}
|
||||
|
||||
private void testDlqGuts(boolean withRetry, HeaderMode headerMode, Integer dlqPartitions) throws Exception {
|
||||
testDlqGuts(withRetry, headerMode, dlqPartitions, false);
|
||||
testDlqGuts(false, HeaderMode.embeddedHeaders, 3, false, false);
|
||||
}
|
||||
|
||||
private void testDlqGuts(boolean withRetry, HeaderMode headerMode, Integer dlqPartitions,
|
||||
boolean transactional) throws Exception {
|
||||
boolean transactional, boolean useDlqDestResolver) throws Exception {
|
||||
|
||||
int expectedDlqPartition = dlqPartitions == null ? 0 : dlqPartitions - 1;
|
||||
KafkaBinderConfigurationProperties binderConfig = createConfigurationProperties();
|
||||
@@ -918,7 +920,11 @@ public class KafkaBinderTests extends
|
||||
else {
|
||||
dlqPartitionFunction = (group, rec, ex) -> dlqPartitions - 1;
|
||||
}
|
||||
AbstractKafkaTestBinder binder = getBinder(binderConfig, dlqPartitionFunction);
|
||||
DlqDestinationResolver dlqDestinationResolver = null;
|
||||
if (useDlqDestResolver) {
|
||||
dlqDestinationResolver = (cr, e) -> "foo.dlq";
|
||||
}
|
||||
AbstractKafkaTestBinder binder = getBinder(binderConfig, dlqPartitionFunction, dlqDestinationResolver);
|
||||
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.getExtension()
|
||||
@@ -961,10 +967,16 @@ public class KafkaBinderTests extends
|
||||
assertThat(container.getContainerProperties().getTopicPartitionsToAssign().length)
|
||||
.isEqualTo(4); // 2 topics 2 partitions each
|
||||
|
||||
String dlqTopic = useDlqDestResolver ? "foo.dlq" : "error.dlqTest." + uniqueBindingId + ".0.testGroup";
|
||||
try (AdminClient admin = AdminClient.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
embeddedKafka.getEmbeddedKafka().getBrokersAsString()))) {
|
||||
|
||||
Map<String, TopicDescription> topicDescriptions = admin.describeTopics(Collections.singletonList("error.dlqTest." + uniqueBindingId + ".0.testGroup"))
|
||||
if (useDlqDestResolver) {
|
||||
List<NewTopic> nonProvisionedDlqTopics = new ArrayList<>();
|
||||
NewTopic nTopic = new NewTopic(dlqTopic, 3, (short) 1);
|
||||
nonProvisionedDlqTopics.add(nTopic);
|
||||
admin.createTopics(nonProvisionedDlqTopics);
|
||||
}
|
||||
Map<String, TopicDescription> topicDescriptions = admin.describeTopics(Collections.singletonList(dlqTopic))
|
||||
.all()
|
||||
.get(10, TimeUnit.SECONDS);
|
||||
assertThat(topicDescriptions).hasSize(1);
|
||||
@@ -996,7 +1008,7 @@ public class KafkaBinderTests extends
|
||||
globalErrorChannel.subscribe(globalErrorChannelMessage::set);
|
||||
|
||||
Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
|
||||
"error.dlqTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel,
|
||||
dlqTopic, null, dlqChannel,
|
||||
dlqConsumerProperties);
|
||||
binderBindUnbindLatency();
|
||||
String testMessagePayload = "test." + UUID.randomUUID().toString();
|
||||
|
||||
@@ -20,6 +20,7 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqDestinationResolver;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
@@ -42,16 +43,17 @@ public class KafkaTestBinder extends AbstractKafkaTestBinder {
|
||||
KafkaTestBinder(KafkaBinderConfigurationProperties binderConfiguration,
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner) {
|
||||
|
||||
this(binderConfiguration, kafkaTopicProvisioner, null);
|
||||
this(binderConfiguration, kafkaTopicProvisioner, null, null);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
KafkaTestBinder(KafkaBinderConfigurationProperties binderConfiguration,
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner, DlqPartitionFunction dlqPartitionFunction) {
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner, DlqPartitionFunction dlqPartitionFunction,
|
||||
DlqDestinationResolver dlqDestinationResolver) {
|
||||
|
||||
try {
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
|
||||
binderConfiguration, kafkaTopicProvisioner, null, null, null, dlqPartitionFunction) {
|
||||
binderConfiguration, kafkaTopicProvisioner, null, null, null, dlqPartitionFunction, dlqDestinationResolver) {
|
||||
|
||||
/*
|
||||
* Some tests use multiple instance indexes for the same topic; we need to
|
||||
|
||||
Reference in New Issue
Block a user