GH-853: Don't propagate out "internal" headers

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/853
This commit is contained in:
Gary Russell
2020-02-27 13:01:58 -05:00
committed by Soby Chacko
parent e46bd1f844
commit d594bab4cf
4 changed files with 159 additions and 14 deletions

1
.gitignore vendored
View File

@@ -23,3 +23,4 @@ _site/
dump.rdb
.apt_generated
artifacts
.sts4-cache

View File

@@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -37,6 +38,8 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.kafka.support.AbstractKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonUtils;
import org.springframework.lang.Nullable;
@@ -49,7 +52,7 @@ import org.springframework.util.MimeType;
* Custom header mapper for Apache Kafka. This is identical to the {@link org.springframework.kafka.support.DefaultKafkaHeaderMapper}
* from spring Kafka. This is provided for addressing some interoperability issues between Spring Cloud Stream 3.0.x
* and 2.x apps, where mime types passed as regular {@link MimeType} in the header are not de-serialized properly.
* Once those concerns are addressed in Spring Kafka, we will deprecate this class and remove it in a future binder release.
* It also suppresses certain internal headers that should never be propagated on output.
*
* Most headers in {@link org.springframework.kafka.support.KafkaHeaders} are not mapped onto outbound messages.
* The exceptions are correlation and reply headers for request/reply
@@ -65,6 +68,16 @@ import org.springframework.util.MimeType;
*/
public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
private static final String NEGATE = "!";
private static final String NEVER_ID = NEGATE + MessageHeaders.ID;
private static final String NEVER_TIMESTAMP = NEGATE + MessageHeaders.TIMESTAMP;
private static final String NEVER_DELIVERY_ATTEMPT = NEGATE + IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT;
private static final String NEVER_NATIVE_HEADERS_PRESENT = NEGATE + BinderHeaders.NATIVE_HEADERS_PRESENT;
private static final String JAVA_LANG_STRING = "java.lang.String";
private static final List<String> DEFAULT_TRUSTED_PACKAGES =
@@ -119,8 +132,10 @@ public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
*/
public BinderHeaderMapper(ObjectMapper objectMapper) {
this(objectMapper,
"!" + MessageHeaders.ID,
"!" + MessageHeaders.TIMESTAMP,
NEVER_ID,
NEVER_TIMESTAMP,
NEVER_DELIVERY_ATTEMPT,
NEVER_NATIVE_HEADERS_PRESENT,
"*");
}
@@ -384,6 +399,32 @@ public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
return true;
}
/**
* Add patterns for headers that should never be mapped.
* @param patterns the patterns.
* @return the modified patterns.
* @since 3.0.2
*/
public static String[] addNeverHeaderPatterns(List<String> patterns) {
List<String> patternsToUse = new LinkedList<>(patterns);
patternsToUse.add(0, NEVER_NATIVE_HEADERS_PRESENT);
patternsToUse.add(0, NEVER_DELIVERY_ATTEMPT);
patternsToUse.add(0, NEVER_TIMESTAMP);
patternsToUse.add(0, NEVER_ID);
return patternsToUse.toArray(new String[0]);
}
/**
* Remove never headers.
* @param headers the headers from which to remove the never headers.
* @since 3.0.2
*/
public static void removeNeverHeaders(Headers headers) {
headers.remove(MessageHeaders.ID);
headers.remove(MessageHeaders.TIMESTAMP);
headers.remove(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
headers.remove(BinderHeaders.NATIVE_HEADERS_PRESENT);
}
/**
* The {@link StdNodeBasedDeserializer} extension for {@link MimeType} deserialization.

View File

@@ -25,7 +25,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -422,23 +421,32 @@ public class KafkaMessageChannelBinder extends
mapper = null;
}
else if (mapper == null) {
String[] headerPatterns = producerProperties.getExtension()
.getHeaderPatterns();
String[] headerPatterns = producerProperties.getExtension().getHeaderPatterns();
if (headerPatterns != null && headerPatterns.length > 0) {
List<String> patterns = new LinkedList<>(Arrays.asList(headerPatterns));
if (!patterns.contains("!" + MessageHeaders.TIMESTAMP)) {
patterns.add(0, "!" + MessageHeaders.TIMESTAMP);
}
if (!patterns.contains("!" + MessageHeaders.ID)) {
patterns.add(0, "!" + MessageHeaders.ID);
}
mapper = new BinderHeaderMapper(
patterns.toArray(new String[patterns.size()]));
BinderHeaderMapper.addNeverHeaderPatterns(Arrays.asList(headerPatterns)));
}
else {
mapper = new BinderHeaderMapper();
}
}
else {
KafkaHeaderMapper userHeaderMapper = mapper;
mapper = new KafkaHeaderMapper() {
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
userHeaderMapper.toHeaders(source, target);
}
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
userHeaderMapper.fromHeaders(headers, target);
BinderHeaderMapper.removeNeverHeaders(target);
}
};
}
handler.setHeaderMapper(mapper);
return handler;
}

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -51,7 +52,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.TimestampType;
@@ -3508,6 +3511,98 @@ public class KafkaBinderTests extends
}
}
@Test
public void testInternalHeadersNotPropagated() throws Exception {
testInternalHeadersNotPropagatedGuts("propagate.1", null, null);
}
@Test
public void testInternalHeadersNotPropagatedCustomHeader() throws Exception {
testInternalHeadersNotPropagatedGuts("propagate.2", new String[] { "foo", "*" }, null);
}
@Test
public void testInternalHeadersNotPropagatedCustomMapper() throws Exception {
testInternalHeadersNotPropagatedGuts("propagate.3", null, new BinderHeaderMapper("*"));
}
public void testInternalHeadersNotPropagatedGuts(String name, String[] headerPatterns,
KafkaHeaderMapper mapper) throws Exception {
KafkaTestBinder binder;
if (mapper == null) {
binder = getBinder();
}
else {
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setHeaderMapperBeanName("headerMapper");
KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner(
binderConfiguration, new TestKafkaProperties());
try {
kafkaTopicProvisioner.afterPropertiesSet();
}
catch (Exception e) {
throw new RuntimeException(e);
}
binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner);
((GenericApplicationContext) binder.getApplicationContext()).registerBean("headerMapper",
KafkaHeaderMapper.class, () -> mapper);
}
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.getExtension().setHeaderPatterns(headerPatterns);
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties));
output.setBeanName(name + ".out");
Binding<MessageChannel> producerBinding = binder.bindProducer(name + ".1", output, producerProperties);
QueueChannel input = new QueueChannel();
input.setBeanName(name + ".in");
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
Binding<MessageChannel> consumerBinding = binder.bindConsumer(name + ".0", name, input, consumerProperties);
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
KafkaTemplate template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps));
template.send(MessageBuilder.withPayload("internalHeaderPropagation")
.setHeader(KafkaHeaders.TOPIC, name + ".0")
.setHeader("someHeader", "someValue")
.build());
Message<?> consumed = input.receive(10_000);
if (headerPatterns != null) {
consumed = MessageBuilder.fromMessage(consumed).setHeader(headerPatterns[0], "bar").build();
}
output.send(consumed);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(name, "false",
embeddedKafka.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer consumer = cf.createConsumer();
consumer.assign(Collections.singletonList(new TopicPartition(name + ".1", 0)));
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(10));
assertThat(records.count()).isEqualTo(1);
ConsumerRecord<?, ?> received = records.iterator().next();
assertThat(received.value()).isEqualTo("internalHeaderPropagation".getBytes());
Header header = received.headers().lastHeader(BinderHeaders.NATIVE_HEADERS_PRESENT);
assertThat(header).isNull();
header = received.headers().lastHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT);
assertThat(header).isNull();
header = received.headers().lastHeader(MessageHeaders.ID);
assertThat(header).isNull();
header = received.headers().lastHeader(MessageHeaders.TIMESTAMP);
assertThat(header).isNull();
assertThat(received.headers().lastHeader("someHeader")).isNotNull();
if (headerPatterns != null) {
assertThat(received.headers().lastHeader(headerPatterns[0])).isNotNull();
}
producerBinding.unbind();
consumerBinding.unbind();
consumer.close();
}
private final class FailingInvocationCountingMessageHandler
implements MessageHandler {