Adding improvements to InteractiveQueryService

New API additions.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/942
This commit is contained in:
Soby Chacko
2020-10-28 18:58:02 -04:00
parent 50f4470fcf
commit 97e3b61d14
3 changed files with 46 additions and 3 deletions

View File

@@ -1049,6 +1049,20 @@ else {
}
----
==== Other API methods available through the InteractiveQueryService
Use the following API method to retrieve the `KeyQueryMetadata` object associated with the combination of given store and key.
```
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
```
Use the following API method to retrieve the `KakfaStreams` object associated with the combination of given store and key.
```
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
```
=== Health Indicator
The health indicator requires the dependency `spring-boot-starter-actuator`. For maven use:

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
@@ -158,7 +159,8 @@ public class InteractiveQueryService {
}
/**
* Retrieves the {@link KeyQueryMetadata} associated with the given combination of key and state store.
* Retrieves and returns the {@link KeyQueryMetadata} associated with the given combination of
* key and state store. If none found, it will return null.
*
* @param <K> generic type for key
* @param store store name
@@ -173,6 +175,27 @@ public class InteractiveQueryService {
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
}
/**
* Retrieves and returns the {@link KafkaStreams} object that is associated with the given combination of
* key and state store. If none found, it will return null.
*
* @param <K> generic type for key
* @param store store name
* @param key key to look for
* @param serializer {@link Serializer} for the key
* @return {@link KafkaStreams} object associated with this combination of store and key
*/
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer) {
final AtomicReference<KafkaStreams> kafkaStreamsAtomicReference = new AtomicReference<>();
this.kafkaStreamsRegistry.getKafkaStreams()
.forEach(k -> {
final KeyQueryMetadata keyQueryMetadata = k.queryMetadataForKey(store, key, serializer);
if (keyQueryMetadata != null) {
kafkaStreamsAtomicReference.set(k);
}
});
return kafkaStreamsAtomicReference.get();
}
/**
* Gets the list of {@link HostInfo} where the provided store is hosted on.

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -174,6 +174,12 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
assertThat(activeHost.host() + ":" + activeHost.port())
.isEqualTo(embeddedKafka.getBrokersAsString());
final KafkaStreams kafkaStreams = interactiveQueryService.getKafkaStreams("prod-id-count-store",
123, new IntegerSerializer());
assertThat(kafkaStreams).isNotNull();
assertThat(interactiveQueryService.getKafkaStreams("non-existent-store",
123, new IntegerSerializer())).isNull();
HostInfo hostInfo = interactiveQueryService.getHostInfo("prod-id-count-store",
123, new IntegerSerializer());
assertThat(hostInfo.host() + ":" + hostInfo.port())