From 52c0b35add9f9a19602517ca65449f5101d1a252 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 15 Apr 2020 17:24:08 -0400 Subject: [PATCH] GH-657: DLQ producer properties from the binder Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/657 Adding the ability for the binder to detect DLQ producer properties set on the binder as common producer properties. Adding test to verify. --- .../kafka/KafkaMessageChannelBinder.java | 10 ++- .../stream/binder/kafka/KafkaBinderTests.java | 73 ++++++++++++++++++- 2 files changed, 79 insertions(+), 4 deletions(-) diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index 34e8b209..58538c61 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -1079,10 +1079,16 @@ public class KafkaMessageChannelBinder extends if (properties.isUseNativeDecoding()) { if (record != null) { - Map configuration = transMan == null + // Give the binder configuration the least preference. + Map configuration = this.configurationProperties.getConfiguration(); + // Then give any producer specific properties specified on the binder. + configuration.putAll(this.configurationProperties.getProducerProperties()); + Map configs = transMan == null ? dlqProducerProperties.getConfiguration() : this.configurationProperties.getTransaction() - .getProducer().getConfiguration(); + .getProducer().getConfiguration(); + // Finally merge with dlq producer properties or the transaction producer properties. + configuration.putAll(configs); if (record.key() != null && !record.key().getClass().isInstance(byte[].class)) { ensureDlqMessageCanBeProperlySerialized(configuration, diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index 3fc33975..7fbbbc2d 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -853,8 +853,7 @@ public class KafkaBinderTests extends Message receivedMessage = dlqChannel.receive(5000); // Ensure that we didn't receive anything on DLQ because of serializer config - // missing - // on dlq producer while native Decoding is enabled. + // missing on dlq producer while native Decoding is enabled. assertThat(receivedMessage).isNull(); binderBindUnbindLatency(); @@ -865,6 +864,76 @@ public class KafkaBinderTests extends consumerBinding.unbind(); } + // For more details on the context of this test: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/657 + @Test + @SuppressWarnings("unchecked") + public void testDlqWithProducerPropertiesSetAtBinderLevel() + throws Exception { + + KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties( + new TestKafkaProperties()); + + Map consumerProps = new HashMap<>(); + consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + binderConfiguration.setConsumerProperties(consumerProps); + + Map producerProps = new HashMap<>(); + producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + binderConfiguration.setProducerProperties(producerProps); + Binder binder = getBinder(binderConfiguration); + + ExtendedProducerProperties producerProperties = createProducerProperties(); + producerProperties.setUseNativeEncoding(true); + BindingProperties outputBindingProperties = createProducerBindingProperties( + producerProperties); + DirectChannel moduleOutputChannel = createBindableChannel("output", + outputBindingProperties); + + ExtendedConsumerProperties consumerProperties = createConsumerProperties(); + consumerProperties.setUseNativeDecoding(true); + consumerProperties.getExtension().setEnableDlq(true); + + DirectChannel moduleInputChannel = createBindableChannel("input", + createConsumerBindingProperties(consumerProperties)); + + Binding producerBinding = binder.bindProducer("foo.bar", + moduleOutputChannel, outputBindingProperties.getProducer()); + Binding consumerBinding = binder.bindConsumer("foo.bar", + "tdwcapsabl", moduleInputChannel, consumerProperties); + + // Let the consumer actually bind to the producer before sending a msg + binderBindUnbindLatency(); + + FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler(); + moduleInputChannel.subscribe(handler); + + // Consumer for the DLQ destination + QueueChannel dlqChannel = new QueueChannel(); + ExtendedConsumerProperties dlqConsumerProperties = createConsumerProperties(); + dlqConsumerProperties.setMaxAttempts(1); + + Binding dlqConsumerBinding = binder.bindConsumer( + "error.foo.bar." + "tdwcapsabl", null, dlqChannel, + dlqConsumerProperties); + binderBindUnbindLatency(); + + Message message = org.springframework.integration.support.MessageBuilder + .withPayload("foo").build(); + + moduleOutputChannel.send(message); + + Message receivedMessage = dlqChannel.receive(5000); + assertThat(receivedMessage).isNotNull(); + assertThat(receivedMessage.getPayload()).isEqualTo("foo"); + + binderBindUnbindLatency(); + + dlqConsumerBinding.unbind(); + + producerBinding.unbind(); + consumerBinding.unbind(); + } + @Test public void testDlqAndRetry() throws Exception { testDlqGuts(true, null, null);