From aff0dc00eff4e7740a7e67db3fa5489de684e893 Mon Sep 17 00:00:00 2001 From: "Pommerening, Nico" Date: Wed, 17 Nov 2021 22:48:21 +0100 Subject: [PATCH] GH-1161: InteractiveQueryService improvements This PR safe guards state store instances in case there are multiple KafkaStreams instances present that have distinct application IDs but share State Store Names. Change is backwards compatible: In case no KafkaStreams association of the thread can be found, all local state stores are queried as before. In case an associated KafkaStreams Instance is found, but required StateStore is not found in this instance, a warning is issued but backwards compatibility is preserved by looking up all state stores. Store within KafkaStreams instance of thread is preferred over "foreign" store with same name. Warning is issued if requested store is not found within KafkaStreams instance of thread. The main benefit here is to get rid of randomly selecting stores across all KafkaStreams instances in case a store is contained within multiple streams instances with same name. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1161 --- .../streams/InteractiveQueryService.java | 63 ++++++++++++++++--- ...reamsInteractiveQueryIntegrationTests.java | 8 ++- 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java index 17892040..4e35c350 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.QueryableStoreType; @@ -52,6 +53,7 @@ import org.springframework.util.StringUtils; * @author Soby Chacko * @author Renwei Han * @author Serhii Siryi + * @author Nico Pommerening * @since 2.1.0 */ public class InteractiveQueryService { @@ -92,15 +94,16 @@ public class InteractiveQueryService { retryTemplate.setBackOffPolicy(backOffPolicy); retryTemplate.setRetryPolicy(retryPolicy); + KafkaStreams contextSpecificKafkaStreams = getThreadContextSpecificKafkaStreams(); + return retryTemplate.execute(context -> { T store = null; - - final Set kafkaStreams = InteractiveQueryService.this.kafkaStreamsRegistry.getKafkaStreams(); - final Iterator iterator = kafkaStreams.iterator(); Throwable throwable = null; - while (iterator.hasNext()) { + if (contextSpecificKafkaStreams != null) { try { - store = iterator.next().store(StoreQueryParameters.fromNameAndType(storeName, storeType)); + store = contextSpecificKafkaStreams.store( + StoreQueryParameters.fromNameAndType( + storeName, storeType)); } catch (InvalidStateStoreException e) { // pass through.. @@ -110,10 +113,56 @@ public class InteractiveQueryService { if (store != null) { return store; } - throw new IllegalStateException("Error when retrieving state store: " + storeName, throwable); + else if (contextSpecificKafkaStreams != null) { + LOG.warn("Store " + storeName + + " could not be found in Streams context, falling back to all known Streams instances"); + } + final Set kafkaStreams = kafkaStreamsRegistry.getKafkaStreams(); + final Iterator iterator = kafkaStreams.iterator(); + while (iterator.hasNext()) { + try { + store = iterator.next() + .store(StoreQueryParameters.fromNameAndType( + storeName, storeType)); + } + catch (InvalidStateStoreException e) { + // pass through.. + throwable = e; + } + } + if (store != null) { + return store; + } + throw new IllegalStateException( + "Error when retrieving state store: " + storeName, + throwable); }); } + /** + * Retrieves the current {@link KafkaStreams} context if executing Thread is created by a Streams App (contains a matching application id in Thread's name). + * + * @return KafkaStreams instance associated with Thread + */ + private KafkaStreams getThreadContextSpecificKafkaStreams() { + return this.kafkaStreamsRegistry.getKafkaStreams().stream() + .filter(this::filterByThreadName).findAny().orElse(null); + } + + /** + * Checks if the supplied {@link KafkaStreams} instance belongs to the calling Thread by matching the Thread's name with the Streams Application Id. + * + * @param streams {@link KafkaStreams} instance to filter + * @return true if Streams Instance is associated with Thread + */ + private boolean filterByThreadName(KafkaStreams streams) { + String applicationId = kafkaStreamsRegistry.streamBuilderFactoryBean( + streams).getStreamsConfiguration() + .getProperty(StreamsConfig.APPLICATION_ID_CONFIG); + // TODO: is there some better way to find out if a Stream App created the Thread? + return Thread.currentThread().getName().contains(applicationId); + } + /** * Gets the current {@link HostInfo} that the calling kafka streams application is * running on. diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java index 42121993..d7446713 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -28,6 +29,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; @@ -69,6 +71,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; /** * @author Soby Chacko * @author Gary Russell + * @author Nico Pommerening */ public class KafkaStreamsInteractiveQueryIntegrationTests { @@ -106,6 +109,9 @@ public class KafkaStreamsInteractiveQueryIntegrationTests { KafkaStreamsRegistry kafkaStreamsRegistry = new KafkaStreamsRegistry(); kafkaStreamsRegistry.registerKafkaStreams(mock); Mockito.when(mock.isRunning()).thenReturn(true); + Properties mockProperties = new Properties(); + mockProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "fooApp"); + Mockito.when(mock.getStreamsConfiguration()).thenReturn(mockProperties); KafkaStreamsBinderConfigurationProperties binderConfigurationProperties = new KafkaStreamsBinderConfigurationProperties(new KafkaProperties()); binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3);