Version updates and compile fixes

Spring Kafka: 2.8.0-RC1
Spring Integration: 5.5.5
Kafka Clients: 3.0.0

Remove schema registry references.
Updates for removed classes/deprecations in Kafka Streams client.
This commit is contained in:
Soby Chacko
2021-11-01 22:11:06 -04:00
parent 07f10f6eb5
commit c9a07729dd
14 changed files with 35 additions and 34 deletions

View File

@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
@@ -99,7 +100,7 @@ public class InteractiveQueryService {
Throwable throwable = null;
while (iterator.hasNext()) {
try {
store = iterator.next().store(storeName, storeType);
store = iterator.next().store(StoreQueryParameters.fromNameAndType(storeName, storeType));
}
catch (InvalidStateStoreException e) {
// pass through..

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -375,12 +376,12 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator extends AbstractKafkaStr
builder = Stores
.windowStoreBuilder(
Stores.persistentWindowStore(spec.getName(),
spec.getRetention(), 3, spec.getLength(), false),
Duration.ofMillis(spec.getRetention()), Duration.ofMillis(3), false),
keySerde, valueSerde);
break;
case SESSION:
builder = Stores.sessionStoreBuilder(Stores.persistentSessionStore(
spec.getName(), spec.getRetention()), keySerde, valueSerde);
spec.getName(), Duration.ofMillis(spec.getRetention())), keySerde, valueSerde);
break;
default:
throw new UnsupportedOperationException(

View File

@@ -27,9 +27,10 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -119,7 +120,8 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
}
Mockito.verify(mockKafkaStreams, times(3)).store("foo", storeType);
Mockito.verify(mockKafkaStreams, times(3))
.store(StoreQueryParameters.fromNameAndType("foo", storeType));
}
@Test
@@ -211,7 +213,7 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
return input.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey(Serialized.with(new Serdes.IntegerSerde(),
.groupByKey(Grouped.with(new Serdes.IntegerSerde(),
new JsonSerde<>(Product.class)))
.count(Materialized.as("prod-id-count-store")).toStream()
.map((key, value) -> new KeyValue<>(null,

View File

@@ -400,7 +400,7 @@ public class KafkaStreamsBinderWordCountFunctionTests {
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
.count(Materialized.as("foo-WordCounts"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
@@ -24,9 +25,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -254,8 +255,8 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
.flatMapValues(
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000)).count(Materialized.as("foo-WordCounts-x"))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMillis(5000))).count(Materialized.as("foo-WordCounts-x"))
.toStream().map((key, value) -> new KeyValue<>(null,
"Count for " + key.key() + " : " + value));
}

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
@@ -261,7 +262,7 @@ public abstract class DeserializtionErrorHandlerByBinderTests {
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(new JsonSerde<>(Product.class),
new JsonSerde<>(Product.class)))
.windowedBy(TimeWindows.of(5000))
.windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
.count(Materialized.as("id-count-store-x")).toStream()
.map((key, value) -> new KeyValue<>(key.key().id, value));
}

View File

@@ -26,9 +26,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -164,7 +164,7 @@ public class KafkaStreamsBinderMultipleInputTopicsTest {
.flatMapValues(
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.count(Materialized.as("WordCounts-tKWCWSIAP0")).toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key, value)));
}

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
@@ -23,9 +24,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -130,9 +131,9 @@ public class KafkaStreamsBinderPojoInputAndPrimitiveTypeOutputTests {
public KStream<Integer, Long> process(KStream<Object, Product> input) {
return input.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(new JsonSerde<>(Product.class),
.groupByKey(Grouped.with(new JsonSerde<>(Product.class),
new JsonSerde<>(Product.class)))
.windowedBy(TimeWindows.of(5000))
.windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
.count(Materialized.as("id-count-store-x")).toStream()
.map((key, value) -> {
return new KeyValue<>(key.key().id, value);

View File

@@ -179,7 +179,7 @@ public class KafkaStreamsBinderTombstoneTests {
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
.count(Materialized.as("foo-WordCounts"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,

View File

@@ -25,9 +25,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -175,7 +175,7 @@ public abstract class KafkaStreamsNativeEncodingDecodingTests {
.flatMapValues(
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts-x"))
.toStream().map((key, value) -> new KeyValue<>(null,
"Count for " + key.key() + " : " + value));

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes;
@@ -257,7 +258,7 @@ public class KafkaStreamsStateStoreIntegrationTests {
public StoreBuilder mystore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("mystate",
3L, 3, 3L, false), Serdes.String(),
Duration.ofMillis(3), Duration.ofMillis(3), false), Serdes.String(),
Serdes.String());
}
}

View File

@@ -16,15 +16,16 @@
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -137,9 +138,9 @@ public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
return input.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(new JsonSerde<>(Product.class),
.groupByKey(Grouped.with(new JsonSerde<>(Product.class),
new JsonSerde<>(Product.class)))
.windowedBy(TimeWindows.of(5000))
.windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
.count(Materialized.as("id-count-store")).toStream()
.map((key, value) -> new KeyValue<>(key.key().id,
"Count for product with ID 123: " + value));