diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java index 17892040..4e35c350 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.QueryableStoreType; @@ -52,6 +53,7 @@ import org.springframework.util.StringUtils; * @author Soby Chacko * @author Renwei Han * @author Serhii Siryi + * @author Nico Pommerening * @since 2.1.0 */ public class InteractiveQueryService { @@ -92,15 +94,16 @@ public class InteractiveQueryService { retryTemplate.setBackOffPolicy(backOffPolicy); retryTemplate.setRetryPolicy(retryPolicy); + KafkaStreams contextSpecificKafkaStreams = getThreadContextSpecificKafkaStreams(); + return retryTemplate.execute(context -> { T store = null; - - final Set kafkaStreams = InteractiveQueryService.this.kafkaStreamsRegistry.getKafkaStreams(); - final Iterator iterator = kafkaStreams.iterator(); Throwable throwable = null; - while (iterator.hasNext()) { + if (contextSpecificKafkaStreams != null) { try { - store = iterator.next().store(StoreQueryParameters.fromNameAndType(storeName, storeType)); + store = contextSpecificKafkaStreams.store( + StoreQueryParameters.fromNameAndType( + storeName, storeType)); } catch (InvalidStateStoreException e) { // pass through.. @@ -110,10 +113,56 @@ public class InteractiveQueryService { if (store != null) { return store; } - throw new IllegalStateException("Error when retrieving state store: " + storeName, throwable); + else if (contextSpecificKafkaStreams != null) { + LOG.warn("Store " + storeName + + " could not be found in Streams context, falling back to all known Streams instances"); + } + final Set kafkaStreams = kafkaStreamsRegistry.getKafkaStreams(); + final Iterator iterator = kafkaStreams.iterator(); + while (iterator.hasNext()) { + try { + store = iterator.next() + .store(StoreQueryParameters.fromNameAndType( + storeName, storeType)); + } + catch (InvalidStateStoreException e) { + // pass through.. + throwable = e; + } + } + if (store != null) { + return store; + } + throw new IllegalStateException( + "Error when retrieving state store: " + storeName, + throwable); }); } + /** + * Retrieves the current {@link KafkaStreams} context if executing Thread is created by a Streams App (contains a matching application id in Thread's name). + * + * @return KafkaStreams instance associated with Thread + */ + private KafkaStreams getThreadContextSpecificKafkaStreams() { + return this.kafkaStreamsRegistry.getKafkaStreams().stream() + .filter(this::filterByThreadName).findAny().orElse(null); + } + + /** + * Checks if the supplied {@link KafkaStreams} instance belongs to the calling Thread by matching the Thread's name with the Streams Application Id. + * + * @param streams {@link KafkaStreams} instance to filter + * @return true if Streams Instance is associated with Thread + */ + private boolean filterByThreadName(KafkaStreams streams) { + String applicationId = kafkaStreamsRegistry.streamBuilderFactoryBean( + streams).getStreamsConfiguration() + .getProperty(StreamsConfig.APPLICATION_ID_CONFIG); + // TODO: is there some better way to find out if a Stream App created the Thread? + return Thread.currentThread().getName().contains(applicationId); + } + /** * Gets the current {@link HostInfo} that the calling kafka streams application is * running on. diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java index 42121993..d7446713 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -28,6 +29,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StoreQueryParameters; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; @@ -69,6 +71,7 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; /** * @author Soby Chacko * @author Gary Russell + * @author Nico Pommerening */ public class KafkaStreamsInteractiveQueryIntegrationTests { @@ -106,6 +109,9 @@ public class KafkaStreamsInteractiveQueryIntegrationTests { KafkaStreamsRegistry kafkaStreamsRegistry = new KafkaStreamsRegistry(); kafkaStreamsRegistry.registerKafkaStreams(mock); Mockito.when(mock.isRunning()).thenReturn(true); + Properties mockProperties = new Properties(); + mockProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "fooApp"); + Mockito.when(mock.getStreamsConfiguration()).thenReturn(mockProperties); KafkaStreamsBinderConfigurationProperties binderConfigurationProperties = new KafkaStreamsBinderConfigurationProperties(new KafkaProperties()); binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3);