Compare commits
10 Commits
v2.2.0.REL
...
v3.0.0.M1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df8d151878 | ||
|
|
d96dc8361b | ||
|
|
94be206651 | ||
|
|
4ab6432f23 | ||
|
|
24b52809ed | ||
|
|
7450d0731d | ||
|
|
08b41f7396 | ||
|
|
e725a172ba | ||
|
|
b7a3511375 | ||
|
|
d6c06286cd |
10
docs/pom.xml
10
docs/pom.xml
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>3.0.0.M1</version>
|
||||
</parent>
|
||||
<packaging>pom</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
@@ -124,10 +124,10 @@
|
||||
<sourceDocumentName>${docs.main}.adoc</sourceDocumentName>
|
||||
<attributes>
|
||||
<spring-cloud-stream-version>${project.version}</spring-cloud-stream-version>
|
||||
<docs-url>https://cloud.spring.io/</docs-url>
|
||||
<docs-version></docs-version>
|
||||
<!-- <docs-version>${project.version}/</docs-version> -->
|
||||
<!-- <docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url> -->
|
||||
<!-- <docs-url>https://cloud.spring.io/</docs-url> -->
|
||||
<!-- <docs-version></docs-version> -->
|
||||
<docs-version>${project.version}/</docs-version>
|
||||
<docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url>
|
||||
</attributes>
|
||||
</configuration>
|
||||
<executions>
|
||||
|
||||
8
pom.xml
8
pom.xml
@@ -2,20 +2,20 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>3.0.0.M1</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.1.4.RELEASE</version>
|
||||
<version>2.2.0.M2</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
|
||||
<spring-kafka.version>2.2.5.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>2.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>2.2.0.RELEASE</spring-cloud-stream.version>
|
||||
<spring-cloud-stream.version>3.0.0.M1</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>3.0.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>3.0.0.M1</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -466,11 +466,11 @@ public class KafkaTopicProvisioner implements
|
||||
// In some cases, the above partition query may not throw an UnknownTopic..Exception for various reasons.
|
||||
// For that, we are forcing another query to ensure that the topic is present on the server.
|
||||
if (CollectionUtils.isEmpty(partitions)) {
|
||||
final AdminClient adminClient = AdminClient
|
||||
.create(this.adminClientProperties);
|
||||
final DescribeTopicsResult describeTopicsResult = adminClient
|
||||
try (AdminClient adminClient = AdminClient
|
||||
.create(this.adminClientProperties)) {
|
||||
final DescribeTopicsResult describeTopicsResult = adminClient
|
||||
.describeTopics(Collections.singletonList(topicName));
|
||||
try {
|
||||
|
||||
describeTopicsResult.all().get();
|
||||
}
|
||||
catch (ExecutionException ex) {
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>3.0.0.M1</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -94,8 +94,6 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" +
|
||||
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
|
||||
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
|
||||
receiveAndValidate(context);
|
||||
}
|
||||
|
||||
@@ -108,10 +108,8 @@ public abstract class DeserializtionErrorHandlerByBinderTests {
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"spring.cloud.stream.bindings.output.producer.headerMode=raw",
|
||||
"spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
|
||||
"spring.cloud.stream.bindings.input.consumer.headerMode=raw",
|
||||
"spring.cloud.stream.kafka.streams.binder.serdeError=sendToDlq",
|
||||
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id"
|
||||
+ "=deserializationByBinderAndDlqTests",
|
||||
|
||||
@@ -108,7 +108,6 @@ public class KafkaStreamsBinderMultipleInputTopicsTest {
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
|
||||
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
|
||||
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId"
|
||||
|
||||
@@ -115,8 +115,6 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
|
||||
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
|
||||
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
|
||||
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers="
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.0.RELEASE</version>
|
||||
<version>3.0.0.M1</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -1,387 +0,0 @@
|
||||
/*
|
||||
* Copyright 2017-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.deser.std.StdNodeBasedDeserializer;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.databind.type.TypeFactory;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
|
||||
import org.springframework.kafka.support.AbstractKafkaHeaderMapper;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
/**
|
||||
* Default header mapper for Apache Kafka. Most headers in {@link KafkaHeaders} are not
|
||||
* mapped on outbound messages. The exceptions are correlation and reply headers for
|
||||
* request/reply messaging. Header types are added to a special header
|
||||
* {@link #JSON_TYPES}.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @author Artem Bilan
|
||||
* @since 2.0
|
||||
* @deprecated will be removed in the next point release after 2.1.0. See issue
|
||||
* https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/509
|
||||
*/
|
||||
public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
|
||||
|
||||
private static final List<String> DEFAULT_TRUSTED_PACKAGES = Arrays
|
||||
.asList("java.util", "java.lang", "org.springframework.util");
|
||||
|
||||
private static final List<String> DEFAULT_TO_STRING_CLASSES = Arrays.asList(
|
||||
"org.springframework.util.MimeType", "org.springframework.http.MediaType");
|
||||
|
||||
/**
|
||||
* Header name for java types of other headers.
|
||||
*/
|
||||
public static final String JSON_TYPES = "spring_json_header_types";
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
private final Set<String> trustedPackages = new LinkedHashSet<>(
|
||||
DEFAULT_TRUSTED_PACKAGES);
|
||||
|
||||
private final Set<String> toStringClasses = new LinkedHashSet<>(
|
||||
DEFAULT_TO_STRING_CLASSES);
|
||||
|
||||
/**
|
||||
* Construct an instance with the default object mapper and default header patterns
|
||||
* for outbound headers; all inbound headers are mapped. The default pattern list is
|
||||
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
|
||||
* {@link KafkaHeaders} are never mapped as headers since they represent data in
|
||||
* consumer/producer records.
|
||||
* @see #BinderHeaderMapper(ObjectMapper)
|
||||
*/
|
||||
public BinderHeaderMapper() {
|
||||
this(new ObjectMapper());
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an instance with the provided object mapper and default header patterns
|
||||
* for outbound headers; all inbound headers are mapped. The patterns are applied in
|
||||
* order, stopping on the first match (positive or negative). Patterns are negated by
|
||||
* preceding them with "!". The default pattern list is
|
||||
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
|
||||
* {@link KafkaHeaders} are never mapped as headers since they represent data in
|
||||
* consumer/producer records.
|
||||
* @param objectMapper the object mapper.
|
||||
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
|
||||
*/
|
||||
public BinderHeaderMapper(ObjectMapper objectMapper) {
|
||||
this(objectMapper, "!" + MessageHeaders.ID, "!" + MessageHeaders.TIMESTAMP, "*");
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an instance with a default object mapper and the provided header patterns
|
||||
* for outbound headers; all inbound headers are mapped. The patterns are applied in
|
||||
* order, stopping on the first match (positive or negative). Patterns are negated by
|
||||
* preceding them with "!". The patterns will replace the default patterns; you
|
||||
* generally should not map the {@code "id" and "timestamp"} headers. Note: most of
|
||||
* the headers in {@link KafkaHeaders} are ever mapped as headers since they represent
|
||||
* data in consumer/producer records.
|
||||
* @param patterns the patterns.
|
||||
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
|
||||
*/
|
||||
public BinderHeaderMapper(String... patterns) {
|
||||
this(new ObjectMapper(), patterns);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an instance with the provided object mapper and the provided header
|
||||
* patterns for outbound headers; all inbound headers are mapped. The patterns are
|
||||
* applied in order, stopping on the first match (positive or negative). Patterns are
|
||||
* negated by preceding them with "!". The patterns will replace the default patterns;
|
||||
* you generally should not map the {@code "id" and "timestamp"} headers. Note: most
|
||||
* of the headers in {@link KafkaHeaders} are never mapped as headers since they
|
||||
* represent data in consumer/producer records.
|
||||
* @param objectMapper the object mapper.
|
||||
* @param patterns the patterns.
|
||||
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
|
||||
*/
|
||||
public BinderHeaderMapper(ObjectMapper objectMapper, String... patterns) {
|
||||
super(patterns);
|
||||
Assert.notNull(objectMapper, "'objectMapper' must not be null");
|
||||
Assert.noNullElements(patterns, "'patterns' must not have null elements");
|
||||
this.objectMapper = objectMapper;
|
||||
this.objectMapper.registerModule(new SimpleModule()
|
||||
.addDeserializer(MimeType.class, new MimeTypeJsonDeserializer()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the object mapper.
|
||||
* @return the mapper.
|
||||
*/
|
||||
protected ObjectMapper getObjectMapper() {
|
||||
return this.objectMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide direct access to the trusted packages set for subclasses.
|
||||
* @return the trusted packages.
|
||||
* @since 2.2
|
||||
*/
|
||||
protected Set<String> getTrustedPackages() {
|
||||
return this.trustedPackages;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide direct access to the toString() classes by subclasses.
|
||||
* @return the toString() classes.
|
||||
* @since 2.2
|
||||
*/
|
||||
protected Set<String> getToStringClasses() {
|
||||
return this.toStringClasses;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add packages to the trusted packages list (default {@code java.util, java.lang})
|
||||
* used when constructing objects from JSON. If any of the supplied packages is
|
||||
* {@code "*"}, all packages are trusted. If a class for a non-trusted package is
|
||||
* encountered, the header is returned to the application with value of type
|
||||
* {@link NonTrustedHeaderType}.
|
||||
* @param trustedPackages the packages to trust.
|
||||
*/
|
||||
public void addTrustedPackages(String... trustedPackages) {
|
||||
if (trustedPackages != null) {
|
||||
for (String whiteList : trustedPackages) {
|
||||
if ("*".equals(whiteList)) {
|
||||
this.trustedPackages.clear();
|
||||
break;
|
||||
}
|
||||
else {
|
||||
this.trustedPackages.add(whiteList);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add class names that the outbound mapper should perform toString() operations on
|
||||
* before mapping.
|
||||
* @param classNames the class names.
|
||||
* @since 2.2
|
||||
*/
|
||||
public void addToStringClasses(String... classNames) {
|
||||
this.toStringClasses.addAll(Arrays.asList(classNames));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
final Map<String, String> jsonHeaders = new HashMap<>();
|
||||
headers.forEach((k, v) -> {
|
||||
if (matches(k, v)) {
|
||||
if (v instanceof byte[]) {
|
||||
target.add(new RecordHeader(k, (byte[]) v));
|
||||
}
|
||||
else {
|
||||
try {
|
||||
Object value = v;
|
||||
String className = v.getClass().getName();
|
||||
if (this.toStringClasses.contains(className)) {
|
||||
value = v.toString();
|
||||
className = "java.lang.String";
|
||||
}
|
||||
target.add(new RecordHeader(k,
|
||||
getObjectMapper().writeValueAsBytes(value)));
|
||||
jsonHeaders.put(k, className);
|
||||
}
|
||||
catch (Exception e) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Could not map " + k + " with type "
|
||||
+ v.getClass().getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
if (jsonHeaders.size() > 0) {
|
||||
try {
|
||||
target.add(new RecordHeader(JSON_TYPES,
|
||||
getObjectMapper().writeValueAsBytes(jsonHeaders)));
|
||||
}
|
||||
catch (IllegalStateException | JsonProcessingException e) {
|
||||
logger.error("Could not add json types header", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, final Map<String, Object> headers) {
|
||||
final Map<String, String> jsonTypes = decodeJsonTypes(source);
|
||||
source.forEach(h -> {
|
||||
if (!(h.key().equals(JSON_TYPES))) {
|
||||
if (jsonTypes != null && jsonTypes.containsKey(h.key())) {
|
||||
Class<?> type = Object.class;
|
||||
String requestedType = jsonTypes.get(h.key());
|
||||
boolean trusted = false;
|
||||
try {
|
||||
trusted = trusted(requestedType);
|
||||
if (trusted) {
|
||||
type = ClassUtils.forName(requestedType, null);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Could not load class for header: " + h.key(), e);
|
||||
}
|
||||
if (trusted) {
|
||||
try {
|
||||
headers.put(h.key(),
|
||||
getObjectMapper().readValue(h.value(), type));
|
||||
}
|
||||
catch (IOException e) {
|
||||
logger.error("Could not decode json type: "
|
||||
+ new String(h.value()) + " for key: " + h.key(), e);
|
||||
headers.put(h.key(), h.value());
|
||||
}
|
||||
}
|
||||
else {
|
||||
headers.put(h.key(),
|
||||
new NonTrustedHeaderType(h.value(), requestedType));
|
||||
}
|
||||
}
|
||||
else {
|
||||
headers.put(h.key(), h.value());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Nullable
|
||||
private Map<String, String> decodeJsonTypes(Headers source) {
|
||||
Map<String, String> types = null;
|
||||
Iterator<Header> iterator = source.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Header next = iterator.next();
|
||||
if (next.key().equals(JSON_TYPES)) {
|
||||
try {
|
||||
types = getObjectMapper().readValue(next.value(), Map.class);
|
||||
}
|
||||
catch (IOException e) {
|
||||
logger.error(
|
||||
"Could not decode json types: " + new String(next.value()),
|
||||
e);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
return types;
|
||||
}
|
||||
|
||||
protected boolean trusted(String requestedType) {
|
||||
if (!this.trustedPackages.isEmpty()) {
|
||||
int lastDot = requestedType.lastIndexOf('.');
|
||||
if (lastDot < 0) {
|
||||
return false;
|
||||
}
|
||||
String packageName = requestedType.substring(0, lastDot);
|
||||
for (String trustedPackage : this.trustedPackages) {
|
||||
if (packageName.equals(trustedPackage)
|
||||
|| packageName.startsWith(trustedPackage + ".")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* The {@link StdNodeBasedDeserializer} extension for {@link MimeType}
|
||||
* deserialization. It is presented here for backward compatibility when older
|
||||
* producers send {@link MimeType} headers as serialization version.
|
||||
*/
|
||||
private class MimeTypeJsonDeserializer extends StdNodeBasedDeserializer<MimeType> {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
MimeTypeJsonDeserializer() {
|
||||
super(MimeType.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MimeType convert(JsonNode root, DeserializationContext ctxt)
|
||||
throws IOException {
|
||||
JsonNode type = root.get("type");
|
||||
JsonNode subType = root.get("subtype");
|
||||
JsonNode parameters = root.get("parameters");
|
||||
Map<String, String> params = BinderHeaderMapper.this.objectMapper
|
||||
.readValue(parameters.traverse(), TypeFactory.defaultInstance()
|
||||
.constructMapType(HashMap.class, String.class, String.class));
|
||||
return new MimeType(type.asText(), subType.asText(), params);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a header that could not be decoded due to an untrusted type.
|
||||
*/
|
||||
public static class NonTrustedHeaderType {
|
||||
|
||||
private final byte[] headerValue;
|
||||
|
||||
private final String untrustedType;
|
||||
|
||||
NonTrustedHeaderType(byte[] headerValue, String untrustedType) { // NOSONAR
|
||||
this.headerValue = headerValue; // NOSONAR
|
||||
this.untrustedType = untrustedType;
|
||||
}
|
||||
|
||||
public byte[] getHeaderValue() {
|
||||
return this.headerValue; // NOSONAR
|
||||
}
|
||||
|
||||
public String getUntrustedType() {
|
||||
return this.untrustedType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
try {
|
||||
return "NonTrustedHeaderType [headerValue="
|
||||
+ new String(this.headerValue, StandardCharsets.UTF_8)
|
||||
+ ", untrustedType=" + this.untrustedType + "]";
|
||||
}
|
||||
catch (Exception e) {
|
||||
return "NonTrustedHeaderType [headerValue="
|
||||
+ Arrays.toString(this.headerValue) + ", untrustedType="
|
||||
+ this.untrustedType + "]";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -96,6 +96,7 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
||||
import org.springframework.kafka.listener.ContainerProperties;
|
||||
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
@@ -376,11 +377,11 @@ public class KafkaMessageChannelBinder extends
|
||||
if (!patterns.contains("!" + MessageHeaders.ID)) {
|
||||
patterns.add(0, "!" + MessageHeaders.ID);
|
||||
}
|
||||
mapper = new BinderHeaderMapper(
|
||||
mapper = new DefaultKafkaHeaderMapper(
|
||||
patterns.toArray(new String[patterns.size()]));
|
||||
}
|
||||
else {
|
||||
mapper = new BinderHeaderMapper();
|
||||
mapper = new DefaultKafkaHeaderMapper();
|
||||
}
|
||||
}
|
||||
handler.setHeaderMapper(mapper);
|
||||
@@ -825,7 +826,7 @@ public class KafkaMessageChannelBinder extends
|
||||
KafkaHeaderMapper.class);
|
||||
}
|
||||
if (mapper == null) {
|
||||
BinderHeaderMapper headerMapper = new BinderHeaderMapper() {
|
||||
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper() {
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> headers) {
|
||||
|
||||
@@ -1521,8 +1521,13 @@ public class KafkaBinderTests extends
|
||||
input3, consumerProperties);
|
||||
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
producerProperties.setPartitionKeyExtractorClass(PartitionTestSupport.class);
|
||||
producerProperties.setPartitionSelectorClass(PartitionTestSupport.class);
|
||||
|
||||
((GenericApplicationContext) this.applicationContext).registerBean("pkExtractor",
|
||||
PartitionTestSupport.class, () -> new PartitionTestSupport());
|
||||
((GenericApplicationContext) this.applicationContext).registerBean("pkSelector",
|
||||
PartitionTestSupport.class, () -> new PartitionTestSupport());
|
||||
producerProperties.setPartitionKeyExtractorName("pkExtractor");
|
||||
producerProperties.setPartitionSelectorName("pkSelector");
|
||||
producerProperties.setPartitionCount(3); // overridden to 8 on the actual topic
|
||||
DirectChannel output = createBindableChannel("output",
|
||||
createProducerBindingProperties(producerProperties));
|
||||
@@ -1645,8 +1650,12 @@ public class KafkaBinderTests extends
|
||||
Binder binder = getBinder();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
|
||||
properties.setHeaderMode(HeaderMode.none);
|
||||
properties.setPartitionKeyExtractorClass(RawKafkaPartitionTestSupport.class);
|
||||
properties.setPartitionSelectorClass(RawKafkaPartitionTestSupport.class);
|
||||
((GenericApplicationContext) this.applicationContext).registerBean("pkExtractor",
|
||||
RawKafkaPartitionTestSupport.class, () -> new RawKafkaPartitionTestSupport());
|
||||
((GenericApplicationContext) this.applicationContext).registerBean("pkSelector",
|
||||
RawKafkaPartitionTestSupport.class, () -> new RawKafkaPartitionTestSupport());
|
||||
properties.setPartitionKeyExtractorName("pkExtractor");
|
||||
properties.setPartitionSelectorName("pkSelector");
|
||||
properties.setPartitionCount(6);
|
||||
|
||||
DirectChannel output = createBindableChannel("output",
|
||||
|
||||
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
@@ -51,6 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
|
||||
"spring.kafka.consumer.auto-offset-reset=earliest" })
|
||||
@DirtiesContext
|
||||
@Ignore
|
||||
public class KafkaNullConverterTest {
|
||||
|
||||
private static final String KAFKA_BROKERS_PROPERTY = "spring.kafka.bootstrap-servers";
|
||||
|
||||
Reference in New Issue
Block a user