Compare commits

..

10 Commits

Author SHA1 Message Date
buildmaster
df8d151878 Update SNAPSHOT to 3.0.0.M1 2019-06-11 12:07:03 +00:00
Oleg Zhurakousky
d96dc8361b Bumped s-c-build to 2.2.0.M2 2019-06-11 13:47:29 +02:00
Oleg Zhurakousky
94be206651 Updated POMs for 3.0.0.M1 release 2019-06-10 14:49:49 +02:00
iguissouma
4ab6432f23 Use try with resources when creating AdminClient
Use try with resources when creating AdminClient to release all associated resources.
Fixes gh-660
2019-06-04 09:37:35 -04:00
Walliee
24b52809ed Switch back to use DefaultKafkaHeaderMapper
* update spring-kafka to v2.2.5
* use DefaultKafkaHeaderMapper instead of BinderHeaderMapper
* delete BinderHeaderMapper

resolves #652
2019-05-31 22:15:12 -04:00
Soby Chacko
7450d0731d Fixing ignored tests from upgrade 2019-05-28 15:02:06 -04:00
Oleg Zhurakousky
08b41f7396 Upgraded master to 3.0.0 2019-05-28 16:42:05 +02:00
Oleg Zhurakousky
e725a172ba Bumping version to 2.3.0-B-S for master 2019-05-20 06:33:35 -05:00
buildmaster
b7a3511375 Bumping versions to 2.2.1.BUILD-SNAPSHOT after release 2019-05-07 12:52:13 +00:00
buildmaster
d6c06286cd Going back to snapshots 2019-05-07 12:52:13 +00:00
15 changed files with 36 additions and 418 deletions

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>3.0.0.M1</version>
</parent>
<packaging>pom</packaging>
<name>spring-cloud-stream-binder-kafka-docs</name>
@@ -124,10 +124,10 @@
<sourceDocumentName>${docs.main}.adoc</sourceDocumentName>
<attributes>
<spring-cloud-stream-version>${project.version}</spring-cloud-stream-version>
<docs-url>https://cloud.spring.io/</docs-url>
<docs-version></docs-version>
<!-- <docs-version>${project.version}/</docs-version> -->
<!-- <docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url> -->
<!-- <docs-url>https://cloud.spring.io/</docs-url> -->
<!-- <docs-version></docs-version> -->
<docs-version>${project.version}/</docs-version>
<docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url>
</attributes>
</configuration>
<executions>

View File

@@ -2,20 +2,20 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>3.0.0.M1</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.1.4.RELEASE</version>
<version>2.2.0.M2</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
<spring-kafka.version>2.2.5.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
<kafka.version>2.0.0</kafka.version>
<spring-cloud-stream.version>2.2.0.RELEASE</spring-cloud-stream.version>
<spring-cloud-stream.version>3.0.0.M1</spring-cloud-stream.version>
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>3.0.0.M1</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>3.0.0.M1</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -466,11 +466,11 @@ public class KafkaTopicProvisioner implements
// In some cases, the above partition query may not throw an UnknownTopic..Exception for various reasons.
// For that, we are forcing another query to ensure that the topic is present on the server.
if (CollectionUtils.isEmpty(partitions)) {
final AdminClient adminClient = AdminClient
.create(this.adminClientProperties);
final DescribeTopicsResult describeTopicsResult = adminClient
try (AdminClient adminClient = AdminClient
.create(this.adminClientProperties)) {
final DescribeTopicsResult describeTopicsResult = adminClient
.describeTopics(Collections.singletonList(topicName));
try {
describeTopicsResult.all().get();
}
catch (ExecutionException ex) {

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>3.0.0.M1</version>
</parent>
<properties>

View File

@@ -94,8 +94,6 @@ public class KafkaStreamsBinderWordCountFunctionTests {
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" +
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
receiveAndValidate(context);
}

View File

@@ -108,10 +108,8 @@ public abstract class DeserializtionErrorHandlerByBinderTests {
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.bindings.output.producer.headerMode=raw",
"spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde"
+ "=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"spring.cloud.stream.kafka.streams.binder.serdeError=sendToDlq",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id"
+ "=deserializationByBinderAndDlqTests",

View File

@@ -108,7 +108,6 @@ public class KafkaStreamsBinderMultipleInputTopicsTest {
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId"

View File

@@ -115,8 +115,6 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.bindings.output.producer.headerMode=raw",
"--spring.cloud.stream.bindings.input.consumer.headerMode=raw",
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
"--spring.cloud.stream.kafka.streams.binder.brokers="

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.0.RELEASE</version>
<version>3.0.0.M1</version>
</parent>
<dependencies>

View File

@@ -1,387 +0,0 @@
/*
* Copyright 2017-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.std.StdNodeBasedDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.kafka.support.AbstractKafkaHeaderMapper;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeType;
/**
* Default header mapper for Apache Kafka. Most headers in {@link KafkaHeaders} are not
* mapped on outbound messages. The exceptions are correlation and reply headers for
* request/reply messaging. Header types are added to a special header
* {@link #JSON_TYPES}.
*
* @author Gary Russell
* @author Artem Bilan
* @since 2.0
* @deprecated will be removed in the next point release after 2.1.0. See issue
* https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/509
*/
public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
private static final List<String> DEFAULT_TRUSTED_PACKAGES = Arrays
.asList("java.util", "java.lang", "org.springframework.util");
private static final List<String> DEFAULT_TO_STRING_CLASSES = Arrays.asList(
"org.springframework.util.MimeType", "org.springframework.http.MediaType");
/**
* Header name for java types of other headers.
*/
public static final String JSON_TYPES = "spring_json_header_types";
private final ObjectMapper objectMapper;
private final Set<String> trustedPackages = new LinkedHashSet<>(
DEFAULT_TRUSTED_PACKAGES);
private final Set<String> toStringClasses = new LinkedHashSet<>(
DEFAULT_TO_STRING_CLASSES);
/**
* Construct an instance with the default object mapper and default header patterns
* for outbound headers; all inbound headers are mapped. The default pattern list is
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
* {@link KafkaHeaders} are never mapped as headers since they represent data in
* consumer/producer records.
* @see #BinderHeaderMapper(ObjectMapper)
*/
public BinderHeaderMapper() {
this(new ObjectMapper());
}
/**
* Construct an instance with the provided object mapper and default header patterns
* for outbound headers; all inbound headers are mapped. The patterns are applied in
* order, stopping on the first match (positive or negative). Patterns are negated by
* preceding them with "!". The default pattern list is
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
* {@link KafkaHeaders} are never mapped as headers since they represent data in
* consumer/producer records.
* @param objectMapper the object mapper.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public BinderHeaderMapper(ObjectMapper objectMapper) {
this(objectMapper, "!" + MessageHeaders.ID, "!" + MessageHeaders.TIMESTAMP, "*");
}
/**
* Construct an instance with a default object mapper and the provided header patterns
* for outbound headers; all inbound headers are mapped. The patterns are applied in
* order, stopping on the first match (positive or negative). Patterns are negated by
* preceding them with "!". The patterns will replace the default patterns; you
* generally should not map the {@code "id" and "timestamp"} headers. Note: most of
* the headers in {@link KafkaHeaders} are ever mapped as headers since they represent
* data in consumer/producer records.
* @param patterns the patterns.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public BinderHeaderMapper(String... patterns) {
this(new ObjectMapper(), patterns);
}
/**
* Construct an instance with the provided object mapper and the provided header
* patterns for outbound headers; all inbound headers are mapped. The patterns are
* applied in order, stopping on the first match (positive or negative). Patterns are
* negated by preceding them with "!". The patterns will replace the default patterns;
* you generally should not map the {@code "id" and "timestamp"} headers. Note: most
* of the headers in {@link KafkaHeaders} are never mapped as headers since they
* represent data in consumer/producer records.
* @param objectMapper the object mapper.
* @param patterns the patterns.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public BinderHeaderMapper(ObjectMapper objectMapper, String... patterns) {
super(patterns);
Assert.notNull(objectMapper, "'objectMapper' must not be null");
Assert.noNullElements(patterns, "'patterns' must not have null elements");
this.objectMapper = objectMapper;
this.objectMapper.registerModule(new SimpleModule()
.addDeserializer(MimeType.class, new MimeTypeJsonDeserializer()));
}
/**
* Return the object mapper.
* @return the mapper.
*/
protected ObjectMapper getObjectMapper() {
return this.objectMapper;
}
/**
* Provide direct access to the trusted packages set for subclasses.
* @return the trusted packages.
* @since 2.2
*/
protected Set<String> getTrustedPackages() {
return this.trustedPackages;
}
/**
* Provide direct access to the toString() classes by subclasses.
* @return the toString() classes.
* @since 2.2
*/
protected Set<String> getToStringClasses() {
return this.toStringClasses;
}
/**
* Add packages to the trusted packages list (default {@code java.util, java.lang})
* used when constructing objects from JSON. If any of the supplied packages is
* {@code "*"}, all packages are trusted. If a class for a non-trusted package is
* encountered, the header is returned to the application with value of type
* {@link NonTrustedHeaderType}.
* @param trustedPackages the packages to trust.
*/
public void addTrustedPackages(String... trustedPackages) {
if (trustedPackages != null) {
for (String whiteList : trustedPackages) {
if ("*".equals(whiteList)) {
this.trustedPackages.clear();
break;
}
else {
this.trustedPackages.add(whiteList);
}
}
}
}
/**
* Add class names that the outbound mapper should perform toString() operations on
* before mapping.
* @param classNames the class names.
* @since 2.2
*/
public void addToStringClasses(String... classNames) {
this.toStringClasses.addAll(Arrays.asList(classNames));
}
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final Map<String, String> jsonHeaders = new HashMap<>();
headers.forEach((k, v) -> {
if (matches(k, v)) {
if (v instanceof byte[]) {
target.add(new RecordHeader(k, (byte[]) v));
}
else {
try {
Object value = v;
String className = v.getClass().getName();
if (this.toStringClasses.contains(className)) {
value = v.toString();
className = "java.lang.String";
}
target.add(new RecordHeader(k,
getObjectMapper().writeValueAsBytes(value)));
jsonHeaders.put(k, className);
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Could not map " + k + " with type "
+ v.getClass().getName());
}
}
}
}
});
if (jsonHeaders.size() > 0) {
try {
target.add(new RecordHeader(JSON_TYPES,
getObjectMapper().writeValueAsBytes(jsonHeaders)));
}
catch (IllegalStateException | JsonProcessingException e) {
logger.error("Could not add json types header", e);
}
}
}
@Override
public void toHeaders(Headers source, final Map<String, Object> headers) {
final Map<String, String> jsonTypes = decodeJsonTypes(source);
source.forEach(h -> {
if (!(h.key().equals(JSON_TYPES))) {
if (jsonTypes != null && jsonTypes.containsKey(h.key())) {
Class<?> type = Object.class;
String requestedType = jsonTypes.get(h.key());
boolean trusted = false;
try {
trusted = trusted(requestedType);
if (trusted) {
type = ClassUtils.forName(requestedType, null);
}
}
catch (Exception e) {
logger.error("Could not load class for header: " + h.key(), e);
}
if (trusted) {
try {
headers.put(h.key(),
getObjectMapper().readValue(h.value(), type));
}
catch (IOException e) {
logger.error("Could not decode json type: "
+ new String(h.value()) + " for key: " + h.key(), e);
headers.put(h.key(), h.value());
}
}
else {
headers.put(h.key(),
new NonTrustedHeaderType(h.value(), requestedType));
}
}
else {
headers.put(h.key(), h.value());
}
}
});
}
@SuppressWarnings("unchecked")
@Nullable
private Map<String, String> decodeJsonTypes(Headers source) {
Map<String, String> types = null;
Iterator<Header> iterator = source.iterator();
while (iterator.hasNext()) {
Header next = iterator.next();
if (next.key().equals(JSON_TYPES)) {
try {
types = getObjectMapper().readValue(next.value(), Map.class);
}
catch (IOException e) {
logger.error(
"Could not decode json types: " + new String(next.value()),
e);
}
break;
}
}
return types;
}
protected boolean trusted(String requestedType) {
if (!this.trustedPackages.isEmpty()) {
int lastDot = requestedType.lastIndexOf('.');
if (lastDot < 0) {
return false;
}
String packageName = requestedType.substring(0, lastDot);
for (String trustedPackage : this.trustedPackages) {
if (packageName.equals(trustedPackage)
|| packageName.startsWith(trustedPackage + ".")) {
return true;
}
}
return false;
}
return true;
}
/**
* The {@link StdNodeBasedDeserializer} extension for {@link MimeType}
* deserialization. It is presented here for backward compatibility when older
* producers send {@link MimeType} headers as serialization version.
*/
private class MimeTypeJsonDeserializer extends StdNodeBasedDeserializer<MimeType> {
private static final long serialVersionUID = 1L;
MimeTypeJsonDeserializer() {
super(MimeType.class);
}
@Override
public MimeType convert(JsonNode root, DeserializationContext ctxt)
throws IOException {
JsonNode type = root.get("type");
JsonNode subType = root.get("subtype");
JsonNode parameters = root.get("parameters");
Map<String, String> params = BinderHeaderMapper.this.objectMapper
.readValue(parameters.traverse(), TypeFactory.defaultInstance()
.constructMapType(HashMap.class, String.class, String.class));
return new MimeType(type.asText(), subType.asText(), params);
}
}
/**
* Represents a header that could not be decoded due to an untrusted type.
*/
public static class NonTrustedHeaderType {
private final byte[] headerValue;
private final String untrustedType;
NonTrustedHeaderType(byte[] headerValue, String untrustedType) { // NOSONAR
this.headerValue = headerValue; // NOSONAR
this.untrustedType = untrustedType;
}
public byte[] getHeaderValue() {
return this.headerValue; // NOSONAR
}
public String getUntrustedType() {
return this.untrustedType;
}
@Override
public String toString() {
try {
return "NonTrustedHeaderType [headerValue="
+ new String(this.headerValue, StandardCharsets.UTF_8)
+ ", untrustedType=" + this.untrustedType + "]";
}
catch (Exception e) {
return "NonTrustedHeaderType [headerValue="
+ Arrays.toString(this.headerValue) + ", untrustedType="
+ this.untrustedType + "]";
}
}
}
}

View File

@@ -96,6 +96,7 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.ProducerListener;
@@ -376,11 +377,11 @@ public class KafkaMessageChannelBinder extends
if (!patterns.contains("!" + MessageHeaders.ID)) {
patterns.add(0, "!" + MessageHeaders.ID);
}
mapper = new BinderHeaderMapper(
mapper = new DefaultKafkaHeaderMapper(
patterns.toArray(new String[patterns.size()]));
}
else {
mapper = new BinderHeaderMapper();
mapper = new DefaultKafkaHeaderMapper();
}
}
handler.setHeaderMapper(mapper);
@@ -825,7 +826,7 @@ public class KafkaMessageChannelBinder extends
KafkaHeaderMapper.class);
}
if (mapper == null) {
BinderHeaderMapper headerMapper = new BinderHeaderMapper() {
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper() {
@Override
public void toHeaders(Headers source, Map<String, Object> headers) {

View File

@@ -1521,8 +1521,13 @@ public class KafkaBinderTests extends
input3, consumerProperties);
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
producerProperties.setPartitionKeyExtractorClass(PartitionTestSupport.class);
producerProperties.setPartitionSelectorClass(PartitionTestSupport.class);
((GenericApplicationContext) this.applicationContext).registerBean("pkExtractor",
PartitionTestSupport.class, () -> new PartitionTestSupport());
((GenericApplicationContext) this.applicationContext).registerBean("pkSelector",
PartitionTestSupport.class, () -> new PartitionTestSupport());
producerProperties.setPartitionKeyExtractorName("pkExtractor");
producerProperties.setPartitionSelectorName("pkSelector");
producerProperties.setPartitionCount(3); // overridden to 8 on the actual topic
DirectChannel output = createBindableChannel("output",
createProducerBindingProperties(producerProperties));
@@ -1645,8 +1650,12 @@ public class KafkaBinderTests extends
Binder binder = getBinder();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.setHeaderMode(HeaderMode.none);
properties.setPartitionKeyExtractorClass(RawKafkaPartitionTestSupport.class);
properties.setPartitionSelectorClass(RawKafkaPartitionTestSupport.class);
((GenericApplicationContext) this.applicationContext).registerBean("pkExtractor",
RawKafkaPartitionTestSupport.class, () -> new RawKafkaPartitionTestSupport());
((GenericApplicationContext) this.applicationContext).registerBean("pkSelector",
RawKafkaPartitionTestSupport.class, () -> new RawKafkaPartitionTestSupport());
properties.setPartitionKeyExtractorName("pkExtractor");
properties.setPartitionSelectorName("pkSelector");
properties.setPartitionCount(6);
DirectChannel output = createBindableChannel("output",

View File

@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -51,6 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
"spring.kafka.consumer.auto-offset-reset=earliest" })
@DirtiesContext
@Ignore
public class KafkaNullConverterTest {
private static final String KAFKA_BROKERS_PROPERTY = "spring.kafka.bootstrap-servers";