From d594bab4cffb533bccc71419b84f24597a0c0867 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 27 Feb 2020 13:01:58 -0500 Subject: [PATCH] GH-853: Don't propagate out "internal" headers Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/853 --- .gitignore | 1 + .../binder/kafka/BinderHeaderMapper.java | 47 ++++++++- .../kafka/KafkaMessageChannelBinder.java | 30 +++--- .../stream/binder/kafka/KafkaBinderTests.java | 95 +++++++++++++++++++ 4 files changed, 159 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index d446a495..8c684729 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,4 @@ _site/ dump.rdb .apt_generated artifacts +.sts4-cache diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/BinderHeaderMapper.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/BinderHeaderMapper.java index 7398639d..9e5dd435 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/BinderHeaderMapper.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/BinderHeaderMapper.java @@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -37,6 +38,8 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; +import org.springframework.cloud.stream.binder.BinderHeaders; +import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.kafka.support.AbstractKafkaHeaderMapper; import org.springframework.kafka.support.JacksonUtils; import org.springframework.lang.Nullable; @@ -49,7 +52,7 @@ import org.springframework.util.MimeType; * Custom header mapper for Apache Kafka. This is identical to the {@link org.springframework.kafka.support.DefaultKafkaHeaderMapper} * from spring Kafka. This is provided for addressing some interoperability issues between Spring Cloud Stream 3.0.x * and 2.x apps, where mime types passed as regular {@link MimeType} in the header are not de-serialized properly. - * Once those concerns are addressed in Spring Kafka, we will deprecate this class and remove it in a future binder release. + * It also suppresses certain internal headers that should never be propagated on output. * * Most headers in {@link org.springframework.kafka.support.KafkaHeaders} are not mapped onto outbound messages. * The exceptions are correlation and reply headers for request/reply @@ -65,6 +68,16 @@ import org.springframework.util.MimeType; */ public class BinderHeaderMapper extends AbstractKafkaHeaderMapper { + private static final String NEGATE = "!"; + + private static final String NEVER_ID = NEGATE + MessageHeaders.ID; + + private static final String NEVER_TIMESTAMP = NEGATE + MessageHeaders.TIMESTAMP; + + private static final String NEVER_DELIVERY_ATTEMPT = NEGATE + IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT; + + private static final String NEVER_NATIVE_HEADERS_PRESENT = NEGATE + BinderHeaders.NATIVE_HEADERS_PRESENT; + private static final String JAVA_LANG_STRING = "java.lang.String"; private static final List DEFAULT_TRUSTED_PACKAGES = @@ -119,8 +132,10 @@ public class BinderHeaderMapper extends AbstractKafkaHeaderMapper { */ public BinderHeaderMapper(ObjectMapper objectMapper) { this(objectMapper, - "!" + MessageHeaders.ID, - "!" + MessageHeaders.TIMESTAMP, + NEVER_ID, + NEVER_TIMESTAMP, + NEVER_DELIVERY_ATTEMPT, + NEVER_NATIVE_HEADERS_PRESENT, "*"); } @@ -384,6 +399,32 @@ public class BinderHeaderMapper extends AbstractKafkaHeaderMapper { return true; } + /** + * Add patterns for headers that should never be mapped. + * @param patterns the patterns. + * @return the modified patterns. + * @since 3.0.2 + */ + public static String[] addNeverHeaderPatterns(List patterns) { + List patternsToUse = new LinkedList<>(patterns); + patternsToUse.add(0, NEVER_NATIVE_HEADERS_PRESENT); + patternsToUse.add(0, NEVER_DELIVERY_ATTEMPT); + patternsToUse.add(0, NEVER_TIMESTAMP); + patternsToUse.add(0, NEVER_ID); + return patternsToUse.toArray(new String[0]); + } + + /** + * Remove never headers. + * @param headers the headers from which to remove the never headers. + * @since 3.0.2 + */ + public static void removeNeverHeaders(Headers headers) { + headers.remove(MessageHeaders.ID); + headers.remove(MessageHeaders.TIMESTAMP); + headers.remove(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT); + headers.remove(BinderHeaders.NATIVE_HEADERS_PRESENT); + } /** * The {@link StdNodeBasedDeserializer} extension for {@link MimeType} deserialization. diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index 6f48f8ad..01f6ba90 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -25,7 +25,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -422,23 +421,32 @@ public class KafkaMessageChannelBinder extends mapper = null; } else if (mapper == null) { - String[] headerPatterns = producerProperties.getExtension() - .getHeaderPatterns(); + String[] headerPatterns = producerProperties.getExtension().getHeaderPatterns(); if (headerPatterns != null && headerPatterns.length > 0) { - List patterns = new LinkedList<>(Arrays.asList(headerPatterns)); - if (!patterns.contains("!" + MessageHeaders.TIMESTAMP)) { - patterns.add(0, "!" + MessageHeaders.TIMESTAMP); - } - if (!patterns.contains("!" + MessageHeaders.ID)) { - patterns.add(0, "!" + MessageHeaders.ID); - } mapper = new BinderHeaderMapper( - patterns.toArray(new String[patterns.size()])); + BinderHeaderMapper.addNeverHeaderPatterns(Arrays.asList(headerPatterns))); } else { mapper = new BinderHeaderMapper(); } } + else { + KafkaHeaderMapper userHeaderMapper = mapper; + mapper = new KafkaHeaderMapper() { + + @Override + public void toHeaders(Headers source, Map target) { + userHeaderMapper.toHeaders(source, target); + } + + @Override + public void fromHeaders(MessageHeaders headers, Headers target) { + userHeaderMapper.fromHeaders(headers, target); + BinderHeaderMapper.removeNeverHeaders(target); + } + }; + + } handler.setHeaderMapper(mapper); return handler; } diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index 661d8032..9f0cf8a3 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -51,7 +52,9 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.record.TimestampType; @@ -3508,6 +3511,98 @@ public class KafkaBinderTests extends } } + @Test + public void testInternalHeadersNotPropagated() throws Exception { + testInternalHeadersNotPropagatedGuts("propagate.1", null, null); + } + + @Test + public void testInternalHeadersNotPropagatedCustomHeader() throws Exception { + testInternalHeadersNotPropagatedGuts("propagate.2", new String[] { "foo", "*" }, null); + } + + @Test + public void testInternalHeadersNotPropagatedCustomMapper() throws Exception { + testInternalHeadersNotPropagatedGuts("propagate.3", null, new BinderHeaderMapper("*")); + } + + public void testInternalHeadersNotPropagatedGuts(String name, String[] headerPatterns, + KafkaHeaderMapper mapper) throws Exception { + + KafkaTestBinder binder; + if (mapper == null) { + binder = getBinder(); + } + else { + KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); + binderConfiguration.setHeaderMapperBeanName("headerMapper"); + + KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner( + binderConfiguration, new TestKafkaProperties()); + try { + kafkaTopicProvisioner.afterPropertiesSet(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner); + ((GenericApplicationContext) binder.getApplicationContext()).registerBean("headerMapper", + KafkaHeaderMapper.class, () -> mapper); + } + ExtendedProducerProperties producerProperties = createProducerProperties(); + producerProperties.getExtension().setHeaderPatterns(headerPatterns); + + DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties)); + output.setBeanName(name + ".out"); + Binding producerBinding = binder.bindProducer(name + ".1", output, producerProperties); + + QueueChannel input = new QueueChannel(); + input.setBeanName(name + ".in"); + ExtendedConsumerProperties consumerProperties = createConsumerProperties(); + Binding consumerBinding = binder.bindConsumer(name + ".0", name, input, consumerProperties); + Map producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka()); + KafkaTemplate template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps)); + template.send(MessageBuilder.withPayload("internalHeaderPropagation") + .setHeader(KafkaHeaders.TOPIC, name + ".0") + .setHeader("someHeader", "someValue") + .build()); + + Message consumed = input.receive(10_000); + if (headerPatterns != null) { + consumed = MessageBuilder.fromMessage(consumed).setHeader(headerPatterns[0], "bar").build(); + } + output.send(consumed); + + Map consumerProps = KafkaTestUtils.consumerProps(name, "false", + embeddedKafka.getEmbeddedKafka()); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); + Consumer consumer = cf.createConsumer(); + consumer.assign(Collections.singletonList(new TopicPartition(name + ".1", 0))); + ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); + assertThat(records.count()).isEqualTo(1); + ConsumerRecord received = records.iterator().next(); + assertThat(received.value()).isEqualTo("internalHeaderPropagation".getBytes()); + Header header = received.headers().lastHeader(BinderHeaders.NATIVE_HEADERS_PRESENT); + assertThat(header).isNull(); + header = received.headers().lastHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT); + assertThat(header).isNull(); + header = received.headers().lastHeader(MessageHeaders.ID); + assertThat(header).isNull(); + header = received.headers().lastHeader(MessageHeaders.TIMESTAMP); + assertThat(header).isNull(); + assertThat(received.headers().lastHeader("someHeader")).isNotNull(); + if (headerPatterns != null) { + assertThat(received.headers().lastHeader(headerPatterns[0])).isNotNull(); + } + + producerBinding.unbind(); + consumerBinding.unbind(); + consumer.close(); + } + private final class FailingInvocationCountingMessageHandler implements MessageHandler {