InteractiveQueryService API changes

* Add InteractiveQueryService.getAllHostsInfo to fetch metadata
   of all instances that handles specific store.
 * Test to verify this new API (in the existing InteractiveQueryService tests).
This commit is contained in:
Serhii Siryi
2020-02-26 17:21:38 +02:00
committed by Soby Chacko
parent 90a47a675f
commit 65f8cc5660
2 changed files with 34 additions and 1 deletions

View File

@@ -17,9 +17,12 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +48,7 @@ import org.springframework.util.StringUtils;
*
* @author Soby Chacko
* @author Renwei Han
* @author Serhii Siryi
* @since 2.1.0
*/
public class InteractiveQueryService {
@@ -153,4 +157,25 @@ public class InteractiveQueryService {
return streamsMetadata != null ? streamsMetadata.hostInfo() : null;
}
/**
* Gets the list of {@link HostInfo} where the provided store is hosted on.
* It also can include current host info.
* Kafka Streams will look through all the consumer instances under the same application id
* and retrieves all hosts info.
*
* Note that the end-user applications must provide `application.server` as a configuration property
* for all the application instances when calling this method. If this is not available, then an empty list will be returned.
*
* @param store store name
* @return the list of {@link HostInfo} where provided store is hosted on
*/
public List<HostInfo> getAllHostsInfo(String store) {
return kafkaStreamsRegistry.getKafkaStreams()
.stream()
.flatMap(k -> k.allMetadataForStore(store).stream())
.filter(Objects::nonNull)
.map(StreamsMetadata::hostInfo)
.collect(Collectors.toList());
}
}

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
@@ -162,6 +163,7 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
InteractiveQueryService interactiveQueryService = context
.getBean(InteractiveQueryService.class);
HostInfo currentHostInfo = interactiveQueryService.getCurrentHostInfo();
assertThat(currentHostInfo.host() + ":" + currentHostInfo.port())
.isEqualTo(embeddedKafka.getBrokersAsString());
@@ -173,6 +175,13 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
HostInfo hostInfoFoo = interactiveQueryService
.getHostInfo("prod-id-count-store-foo", 123, new IntegerSerializer());
assertThat(hostInfoFoo).isNull();
final List<HostInfo> hostInfos = interactiveQueryService.getAllHostsInfo("prod-id-count-store");
assertThat(hostInfos.size()).isEqualTo(1);
final HostInfo hostInfo1 = hostInfos.get(0);
assertThat(hostInfo1.host() + ":" + hostInfo1.port())
.isEqualTo(embeddedKafka.getBrokersAsString());
}
@EnableBinding(KafkaStreamsProcessor.class)
@@ -214,7 +223,6 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
}
}
}
static class Product {