diff --git a/docs/src/main/asciidoc/kafka-streams.adoc b/docs/src/main/asciidoc/kafka-streams.adoc index 53c6e60a..40be05f9 100644 --- a/docs/src/main/asciidoc/kafka-streams.adoc +++ b/docs/src/main/asciidoc/kafka-streams.adoc @@ -1049,6 +1049,20 @@ else { } ---- +==== Other API methods available through the InteractiveQueryService + +Use the following API method to retrieve the `KeyQueryMetadata` object associated with the combination of given store and key. + +``` +public KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer serializer) +``` + +Use the following API method to retrieve the `KakfaStreams` object associated with the combination of given store and key. + +``` +public KafkaStreams getKafkaStreams(String store, K key, Serializer serializer) +``` + === Health Indicator The health indicator requires the dependency `spring-boot-starter-actuator`. For maven use: diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java index 532c8456..19c8644d 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2019 the original author or authors. + * Copyright 2018-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.commons.logging.Log; @@ -158,7 +159,8 @@ public class InteractiveQueryService { } /** - * Retrieves the {@link KeyQueryMetadata} associated with the given combination of key and state store. + * Retrieves and returns the {@link KeyQueryMetadata} associated with the given combination of + * key and state store. If none found, it will return null. * * @param generic type for key * @param store store name @@ -173,6 +175,27 @@ public class InteractiveQueryService { .filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null); } + /** + * Retrieves and returns the {@link KafkaStreams} object that is associated with the given combination of + * key and state store. If none found, it will return null. + * + * @param generic type for key + * @param store store name + * @param key key to look for + * @param serializer {@link Serializer} for the key + * @return {@link KafkaStreams} object associated with this combination of store and key + */ + public KafkaStreams getKafkaStreams(String store, K key, Serializer serializer) { + final AtomicReference kafkaStreamsAtomicReference = new AtomicReference<>(); + this.kafkaStreamsRegistry.getKafkaStreams() + .forEach(k -> { + final KeyQueryMetadata keyQueryMetadata = k.queryMetadataForKey(store, key, serializer); + if (keyQueryMetadata != null) { + kafkaStreamsAtomicReference.set(k); + } + }); + return kafkaStreamsAtomicReference.get(); + } /** * Gets the list of {@link HostInfo} where the provided store is hosted on. diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java index 13889d14..bd2bf88f 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -174,6 +174,12 @@ public class KafkaStreamsInteractiveQueryIntegrationTests { assertThat(activeHost.host() + ":" + activeHost.port()) .isEqualTo(embeddedKafka.getBrokersAsString()); + final KafkaStreams kafkaStreams = interactiveQueryService.getKafkaStreams("prod-id-count-store", + 123, new IntegerSerializer()); + assertThat(kafkaStreams).isNotNull(); + assertThat(interactiveQueryService.getKafkaStreams("non-existent-store", + 123, new IntegerSerializer())).isNull(); + HostInfo hostInfo = interactiveQueryService.getHostInfo("prod-id-count-store", 123, new IntegerSerializer()); assertThat(hostInfo.host() + ":" + hostInfo.port())