From 01dbb49313a2f478ecf1f2b3229f5f8f6d624d45 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 27 Jan 2022 16:16:10 +0100 Subject: [PATCH] Revert "Update Kafka client version to 3.1.0" This reverts commit a25e2ea0b31456c8b432f08910d4aa4782cf5445. --- pom.xml | 2 +- .../KafkaStreamsBinderBootstrapTest.java | 27 ++++++++++--------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index 47e17fbb..cb45e73d 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ 17 3.0.0-SNAPSHOT 6.0.0-SNAPSHOT - 3.1.0 + 3.0.0 4.0.0-SNAPSHOT true true diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java index c0388769..7d851cff 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java @@ -31,6 +31,7 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; @@ -38,6 +39,7 @@ import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolv import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.StreamsBuilderFactoryBean; +import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper; import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; @@ -141,19 +143,18 @@ public class KafkaStreamsBinderBootstrapTest { assertThat(streamsConfiguration3.containsKey("spring.json.value.type.method")).isFalse(); applicationContext.getBean(KeyValueSerdeResolver.class); - //TODO: In Kafka Streams 3.1, taskTopology field is removed. Re-evaluate this testing strategy. -// String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) -// .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2"); -// -// assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver); -// -// String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) -// .getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName")); -// assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName); -// -// String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) -// .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName")); -// assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName); + String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) + .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2"); + + assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver); + + String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) + .getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName")); + assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName); + + String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams()) + .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName")); + assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName); applicationContext.close(); }