GH-657: DLQ producer properties from the binder

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/657

Adding the ability for the binder to detect DLQ producer properties
set on the binder as common producer properties.

Adding test to verify.
This commit is contained in:
Soby Chacko
2020-04-15 17:24:08 -04:00
committed by Gary Russell
parent 36f00e5867
commit 52c0b35add
2 changed files with 79 additions and 4 deletions

View File

@@ -1079,10 +1079,16 @@ public class KafkaMessageChannelBinder extends
if (properties.isUseNativeDecoding()) {
if (record != null) {
Map<String, String> configuration = transMan == null
// Give the binder configuration the least preference.
Map<String, String> configuration = this.configurationProperties.getConfiguration();
// Then give any producer specific properties specified on the binder.
configuration.putAll(this.configurationProperties.getProducerProperties());
Map<String, String> configs = transMan == null
? dlqProducerProperties.getConfiguration()
: this.configurationProperties.getTransaction()
.getProducer().getConfiguration();
.getProducer().getConfiguration();
// Finally merge with dlq producer properties or the transaction producer properties.
configuration.putAll(configs);
if (record.key() != null
&& !record.key().getClass().isInstance(byte[].class)) {
ensureDlqMessageCanBeProperlySerialized(configuration,

View File

@@ -853,8 +853,7 @@ public class KafkaBinderTests extends
Message<?> receivedMessage = dlqChannel.receive(5000);
// Ensure that we didn't receive anything on DLQ because of serializer config
// missing
// on dlq producer while native Decoding is enabled.
// missing on dlq producer while native Decoding is enabled.
assertThat(receivedMessage).isNull();
binderBindUnbindLatency();
@@ -865,6 +864,76 @@ public class KafkaBinderTests extends
consumerBinding.unbind();
}
// For more details on the context of this test: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/657
@Test
@SuppressWarnings("unchecked")
public void testDlqWithProducerPropertiesSetAtBinderLevel()
throws Exception {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties(
new TestKafkaProperties());
Map<String, String> consumerProps = new HashMap<>();
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
binderConfiguration.setConsumerProperties(consumerProps);
Map<String, String> producerProps = new HashMap<>();
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
binderConfiguration.setProducerProperties(producerProps);
Binder binder = getBinder(binderConfiguration);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setUseNativeEncoding(true);
BindingProperties outputBindingProperties = createProducerBindingProperties(
producerProperties);
DirectChannel moduleOutputChannel = createBindableChannel("output",
outputBindingProperties);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setUseNativeDecoding(true);
consumerProperties.getExtension().setEnableDlq(true);
DirectChannel moduleInputChannel = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
Binding<MessageChannel> producerBinding = binder.bindProducer("foo.bar",
moduleOutputChannel, outputBindingProperties.getProducer());
Binding<MessageChannel> consumerBinding = binder.bindConsumer("foo.bar",
"tdwcapsabl", moduleInputChannel, consumerProperties);
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler();
moduleInputChannel.subscribe(handler);
// Consumer for the DLQ destination
QueueChannel dlqChannel = new QueueChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> dlqConsumerProperties = createConsumerProperties();
dlqConsumerProperties.setMaxAttempts(1);
Binding<MessageChannel> dlqConsumerBinding = binder.bindConsumer(
"error.foo.bar." + "tdwcapsabl", null, dlqChannel,
dlqConsumerProperties);
binderBindUnbindLatency();
Message<?> message = org.springframework.integration.support.MessageBuilder
.withPayload("foo").build();
moduleOutputChannel.send(message);
Message<?> receivedMessage = dlqChannel.receive(5000);
assertThat(receivedMessage).isNotNull();
assertThat(receivedMessage.getPayload()).isEqualTo("foo");
binderBindUnbindLatency();
dlqConsumerBinding.unbind();
producerBinding.unbind();
consumerBinding.unbind();
}
@Test
public void testDlqAndRetry() throws Exception {
testDlqGuts(true, null, null);