Retries for HostInfo in InteractiveQueryService
InteractiveQueryService methods for finding the host info for Kafka Streams currently throw exceptions if the underlying KafkaStreams are not ready yet. Introduce a retry mechanism so that the users can control the behaviour of these methods by providing the following properties. spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts (default 1) spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backoffPeriod (default 1000 ms). Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1185
This commit is contained in:
@@ -1025,7 +1025,7 @@ ReadOnlyKeyValueStore<Object, Object> keyValueStore =
|
||||
----
|
||||
|
||||
During the startup, the above method call to retrieve the store might fail.
|
||||
For e.g it might still be in the middle of initializing the state store.
|
||||
For example, it might still be in the middle of initializing the state store.
|
||||
In such cases, it will be useful to retry this operation.
|
||||
Kafka Streams binder provides a simple retry mechanism to accommodate this.
|
||||
|
||||
@@ -1060,6 +1060,10 @@ else {
|
||||
}
|
||||
----
|
||||
|
||||
For more information on these host finding methods, please see the Javadoc on the methods.
|
||||
For these methods also, during startup, if the underlying KafkaStreams objects are not ready, they might throw exceptions.
|
||||
The aforementioned retry properties are applicable for these methods as well.
|
||||
|
||||
==== Other API methods available through the InteractiveQueryService
|
||||
|
||||
Use the following API method to retrieve the `KeyQueryMetadata` object associated with the combination of given store and key.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2021 the original author or authors.
|
||||
* Copyright 2018-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -84,15 +84,7 @@ public class InteractiveQueryService {
|
||||
*/
|
||||
public <T> T getQueryableStore(String storeName, QueryableStoreType<T> storeType) {
|
||||
|
||||
RetryTemplate retryTemplate = new RetryTemplate();
|
||||
|
||||
KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
|
||||
RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());
|
||||
|
||||
retryTemplate.setBackOffPolicy(backOffPolicy);
|
||||
retryTemplate.setRetryPolicy(retryPolicy);
|
||||
final RetryTemplate retryTemplate = getRetryTemplate();
|
||||
|
||||
KafkaStreams contextSpecificKafkaStreams = getThreadContextSpecificKafkaStreams();
|
||||
|
||||
@@ -191,7 +183,7 @@ public class InteractiveQueryService {
|
||||
* through all the consumer instances under the same application id and retrieves the
|
||||
* proper host.
|
||||
*
|
||||
* Note that the end user applications must provide `applicaiton.server` as a
|
||||
* Note that the end user applications must provide `application.server` as a
|
||||
* configuration property for all the application instances when calling this method.
|
||||
* If this is not available, then null maybe returned.
|
||||
* @param <K> generic type for key
|
||||
@@ -201,11 +193,40 @@ public class InteractiveQueryService {
|
||||
* @return the {@link HostInfo} where the key for the provided store is hosted currently
|
||||
*/
|
||||
public <K> HostInfo getHostInfo(String store, K key, Serializer<K> serializer) {
|
||||
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
|
||||
.stream()
|
||||
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
|
||||
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
|
||||
return keyQueryMetadata != null ? keyQueryMetadata.getActiveHost() : null;
|
||||
final RetryTemplate retryTemplate = getRetryTemplate();
|
||||
|
||||
|
||||
return retryTemplate.execute(context -> {
|
||||
Throwable throwable = null;
|
||||
try {
|
||||
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
|
||||
.stream()
|
||||
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
|
||||
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
|
||||
if (keyQueryMetadata != null) {
|
||||
return keyQueryMetadata.activeHost();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throwable = e;
|
||||
}
|
||||
throw new IllegalStateException(
|
||||
"Error when retrieving state store", throwable != null ? throwable : new Throwable("Kafka Streams is not ready."));
|
||||
});
|
||||
}
|
||||
|
||||
private RetryTemplate getRetryTemplate() {
|
||||
RetryTemplate retryTemplate = new RetryTemplate();
|
||||
|
||||
KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
|
||||
RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());
|
||||
|
||||
retryTemplate.setBackOffPolicy(backOffPolicy);
|
||||
retryTemplate.setRetryPolicy(retryPolicy);
|
||||
|
||||
return retryTemplate;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2017-2021 the original author or authors.
|
||||
* Copyright 2017-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.serialization.IntegerSerializer;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.KeyQueryMetadata;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
@@ -126,6 +127,35 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
.store(StoreQueryParameters.fromNameAndType("foo", storeType));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStateStoreRetrievalRetryForHostInfoService() {
|
||||
StreamsBuilderFactoryBean mock = Mockito.mock(StreamsBuilderFactoryBean.class);
|
||||
KafkaStreams mockKafkaStreams = Mockito.mock(KafkaStreams.class);
|
||||
Mockito.when(mock.getKafkaStreams()).thenReturn(mockKafkaStreams);
|
||||
KafkaStreamsRegistry kafkaStreamsRegistry = new KafkaStreamsRegistry();
|
||||
kafkaStreamsRegistry.registerKafkaStreams(mock);
|
||||
Mockito.when(mock.isRunning()).thenReturn(true);
|
||||
Properties mockProperties = new Properties();
|
||||
mockProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "foobarApp-123");
|
||||
Mockito.when(mock.getStreamsConfiguration()).thenReturn(mockProperties);
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties =
|
||||
new KafkaStreamsBinderConfigurationProperties(new KafkaProperties());
|
||||
binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3);
|
||||
InteractiveQueryService interactiveQueryService = new InteractiveQueryService(kafkaStreamsRegistry,
|
||||
binderConfigurationProperties);
|
||||
|
||||
QueryableStoreType<ReadOnlyKeyValueStore<Object, Object>> storeType = QueryableStoreTypes.keyValueStore();
|
||||
final StringSerializer serializer = new StringSerializer();
|
||||
try {
|
||||
interactiveQueryService.getHostInfo("foo", "fooKey", serializer);
|
||||
}
|
||||
catch (Exception ignored) {
|
||||
|
||||
}
|
||||
Mockito.verify(mockKafkaStreams, times(3))
|
||||
.queryMetadataForKey("foo", "fooKey", serializer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKstreamBinderWithPojoInputAndStringOuput() {
|
||||
SpringApplication app = new SpringApplication(ProductCountApplication.class);
|
||||
@@ -264,4 +294,4 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user