* removed deprecations * simplify serde creation
This commit is contained in:
committed by
Soby Chacko
parent
cb0ed01152
commit
e927b01bec
@@ -17,8 +17,12 @@
|
|||||||
package kafka.streams.product.tracker;
|
package kafka.streams.product.tracker;
|
||||||
|
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.streams.KeyValue;
|
import org.apache.kafka.streams.KeyValue;
|
||||||
import org.apache.kafka.streams.kstream.KStream;
|
import org.apache.kafka.streams.kstream.KStream;
|
||||||
|
import org.apache.kafka.streams.kstream.Materialized;
|
||||||
|
import org.apache.kafka.streams.kstream.Serialized;
|
||||||
|
import org.apache.kafka.streams.state.KeyValueStore;
|
||||||
import org.apache.kafka.streams.state.QueryableStoreTypes;
|
import org.apache.kafka.streams.state.QueryableStoreTypes;
|
||||||
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
|
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -28,7 +32,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
|
|||||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||||
import org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry;
|
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
|
||||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
||||||
import org.springframework.kafka.support.serializer.JsonSerde;
|
import org.springframework.kafka.support.serializer.JsonSerde;
|
||||||
import org.springframework.messaging.handler.annotation.SendTo;
|
import org.springframework.messaging.handler.annotation.SendTo;
|
||||||
@@ -54,7 +58,8 @@ public class KafkaStreamsInteractiveQueryApplication {
|
|||||||
private static final String STORE_NAME = "prod-id-count-store";
|
private static final String STORE_NAME = "prod-id-count-store";
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private QueryableStoreRegistry queryableStoreRegistry;
|
private InteractiveQueryService queryService;
|
||||||
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
ProductTrackerProperties productTrackerProperties;
|
ProductTrackerProperties productTrackerProperties;
|
||||||
@@ -68,8 +73,10 @@ public class KafkaStreamsInteractiveQueryApplication {
|
|||||||
return input
|
return input
|
||||||
.filter((key, product) -> productIds().contains(product.getId()))
|
.filter((key, product) -> productIds().contains(product.getId()))
|
||||||
.map((key, value) -> new KeyValue<>(value.id, value))
|
.map((key, value) -> new KeyValue<>(value.id, value))
|
||||||
.groupByKey(new Serdes.IntegerSerde(), new JsonSerde<>(Product.class))
|
.groupByKey(Serialized.with(Serdes.Integer(), new JsonSerde<>(Product.class)))
|
||||||
.count(STORE_NAME)
|
.count(Materialized.<Integer, Long, KeyValueStore<Bytes, byte[]>>as(STORE_NAME)
|
||||||
|
.withKeySerde(Serdes.Integer())
|
||||||
|
.withValueSerde(Serdes.Long()))
|
||||||
.toStream();
|
.toStream();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -82,7 +89,7 @@ public class KafkaStreamsInteractiveQueryApplication {
|
|||||||
@Scheduled(fixedRate = 30000, initialDelay = 5000)
|
@Scheduled(fixedRate = 30000, initialDelay = 5000)
|
||||||
public void printProductCounts() {
|
public void printProductCounts() {
|
||||||
if (keyValueStore == null) {
|
if (keyValueStore == null) {
|
||||||
keyValueStore = queryableStoreRegistry.getQueryableStoreType(STORE_NAME, QueryableStoreTypes.keyValueStore());
|
keyValueStore = queryService.getQueryableStore(STORE_NAME, QueryableStoreTypes.keyValueStore());
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Integer id : productIds()) {
|
for (Integer id : productIds()) {
|
||||||
|
|||||||
Reference in New Issue
Block a user