From 3770db7844d47aa0fa450d6a5792fca58a02f810 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 11 Jan 2022 18:49:18 -0500 Subject: [PATCH] Retries for HostInfo in InteractiveQueryService InteractiveQueryService methods for finding the host info for Kafka Streams currently throw exceptions if the underlying KafkaStreams are not ready yet. Introduce a retry mechanism so that the users can control the behaviour of these methods by providing the following properties. spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts (default 1) spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backoffPeriod (default 1000 ms). Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1185 --- docs/src/main/asciidoc/kafka-streams.adoc | 6 ++- .../streams/InteractiveQueryService.java | 53 +++++++++++++------ ...reamsInteractiveQueryIntegrationTests.java | 33 +++++++++++- 3 files changed, 73 insertions(+), 19 deletions(-) diff --git a/docs/src/main/asciidoc/kafka-streams.adoc b/docs/src/main/asciidoc/kafka-streams.adoc index 9f016be2..0052531b 100644 --- a/docs/src/main/asciidoc/kafka-streams.adoc +++ b/docs/src/main/asciidoc/kafka-streams.adoc @@ -1261,7 +1261,7 @@ ReadOnlyKeyValueStore keyValueStore = ---- During the startup, the above method call to retrieve the store might fail. -For e.g it might still be in the middle of initializing the state store. +For example, it might still be in the middle of initializing the state store. In such cases, it will be useful to retry this operation. Kafka Streams binder provides a simple retry mechanism to accommodate this. @@ -1296,6 +1296,10 @@ else { } ---- +For more information on these host finding methods, please see the Javadoc on the methods. +For these methods also, during startup, if the underlying KafkaStreams objects are not ready, they might throw exceptions. +The aforementioned retry properties are applicable for these methods as well. + ==== Other API methods available through the InteractiveQueryService Use the following API method to retrieve the `KeyQueryMetadata` object associated with the combination of given store and key. diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java index 4e35c350..33e7890c 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -84,15 +84,7 @@ public class InteractiveQueryService { */ public T getQueryableStore(String storeName, QueryableStoreType storeType) { - RetryTemplate retryTemplate = new RetryTemplate(); - - KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry(); - RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts()); - FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); - backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod()); - - retryTemplate.setBackOffPolicy(backOffPolicy); - retryTemplate.setRetryPolicy(retryPolicy); + final RetryTemplate retryTemplate = getRetryTemplate(); KafkaStreams contextSpecificKafkaStreams = getThreadContextSpecificKafkaStreams(); @@ -191,7 +183,7 @@ public class InteractiveQueryService { * through all the consumer instances under the same application id and retrieves the * proper host. * - * Note that the end user applications must provide `applicaiton.server` as a + * Note that the end user applications must provide `application.server` as a * configuration property for all the application instances when calling this method. * If this is not available, then null maybe returned. * @param generic type for key @@ -201,11 +193,40 @@ public class InteractiveQueryService { * @return the {@link HostInfo} where the key for the provided store is hosted currently */ public HostInfo getHostInfo(String store, K key, Serializer serializer) { - final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams() - .stream() - .map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer))) - .filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null); - return keyQueryMetadata != null ? keyQueryMetadata.getActiveHost() : null; + final RetryTemplate retryTemplate = getRetryTemplate(); + + + return retryTemplate.execute(context -> { + Throwable throwable = null; + try { + final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams() + .stream() + .map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer))) + .filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null); + if (keyQueryMetadata != null) { + return keyQueryMetadata.activeHost(); + } + } + catch (Exception e) { + throwable = e; + } + throw new IllegalStateException( + "Error when retrieving state store", throwable != null ? throwable : new Throwable("Kafka Streams is not ready.")); + }); + } + + private RetryTemplate getRetryTemplate() { + RetryTemplate retryTemplate = new RetryTemplate(); + + KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry(); + RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts()); + FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); + backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod()); + + retryTemplate.setBackOffPolicy(backOffPolicy); + retryTemplate.setRetryPolicy(retryPolicy); + + return retryTemplate; } /** diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java index d7446713..48c7f5f2 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.KeyValue; @@ -125,11 +126,39 @@ public class KafkaStreamsInteractiveQueryIntegrationTests { catch (Exception ignored) { } - Mockito.verify(mockKafkaStreams, times(3)) .store(StoreQueryParameters.fromNameAndType("foo", storeType)); } + @Test + public void testStateStoreRetrievalRetryForHostInfoService() { + StreamsBuilderFactoryBean mock = Mockito.mock(StreamsBuilderFactoryBean.class); + KafkaStreams mockKafkaStreams = Mockito.mock(KafkaStreams.class); + Mockito.when(mock.getKafkaStreams()).thenReturn(mockKafkaStreams); + KafkaStreamsRegistry kafkaStreamsRegistry = new KafkaStreamsRegistry(); + kafkaStreamsRegistry.registerKafkaStreams(mock); + Mockito.when(mock.isRunning()).thenReturn(true); + Properties mockProperties = new Properties(); + mockProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "foobarApp-123"); + Mockito.when(mock.getStreamsConfiguration()).thenReturn(mockProperties); + KafkaStreamsBinderConfigurationProperties binderConfigurationProperties = + new KafkaStreamsBinderConfigurationProperties(new KafkaProperties()); + binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3); + InteractiveQueryService interactiveQueryService = new InteractiveQueryService(kafkaStreamsRegistry, + binderConfigurationProperties); + + QueryableStoreType> storeType = QueryableStoreTypes.keyValueStore(); + final StringSerializer serializer = new StringSerializer(); + try { + interactiveQueryService.getHostInfo("foo", "fooKey", serializer); + } + catch (Exception ignored) { + + } + Mockito.verify(mockKafkaStreams, times(3)) + .queryMetadataForKey("foo", "fooKey", serializer); + } + @Test @Ignore public void testKstreamBinderWithPojoInputAndStringOuput() throws Exception {