diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java index 33e7890c..d8f350af 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryService.java @@ -211,7 +211,7 @@ public class InteractiveQueryService { throwable = e; } throw new IllegalStateException( - "Error when retrieving state store", throwable != null ? throwable : new Throwable("Kafka Streams is not ready.")); + "Error when retrieving state store.", throwable != null ? throwable : new Throwable("Kafka Streams is not ready.")); }); } diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java index 7acd4182..a924c2d2 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsInteractiveQueryIntegrationTests.java @@ -63,6 +63,7 @@ import org.springframework.kafka.test.rule.EmbeddedKafkaRule; import org.springframework.kafka.test.utils.KafkaTestUtils; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.internal.verification.VerificationModeFactory.times; /** @@ -147,13 +148,13 @@ public class KafkaStreamsInteractiveQueryIntegrationTests { QueryableStoreType> storeType = QueryableStoreTypes.keyValueStore(); final StringSerializer serializer = new StringSerializer(); try { - interactiveQueryService.getHostInfo("foo", "fooKey", serializer); + interactiveQueryService.getHostInfo("foo", "foobarApp-key", serializer); } catch (Exception ignored) { } Mockito.verify(mockKafkaStreams, times(3)) - .queryMetadataForKey("foo", "fooKey", serializer); + .queryMetadataForKey("foo", "foobarApp-key", serializer); } @Test @@ -224,16 +225,16 @@ public class KafkaStreamsInteractiveQueryIntegrationTests { assertThat(hostInfo.host() + ":" + hostInfo.port()) .isEqualTo(embeddedKafka.getBrokersAsString()); - HostInfo hostInfoFoo = interactiveQueryService - .getHostInfo("prod-id-count-store-foo", 123, new IntegerSerializer()); - assertThat(hostInfoFoo).isNull(); + assertThatThrownBy(() -> interactiveQueryService + .getHostInfo("prod-id-count-store-foo", 123, new IntegerSerializer())) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Error when retrieving state store."); final List hostInfos = interactiveQueryService.getAllHostsInfo("prod-id-count-store"); assertThat(hostInfos.size()).isEqualTo(1); final HostInfo hostInfo1 = hostInfos.get(0); assertThat(hostInfo1.host() + ":" + hostInfo1.port()) .isEqualTo(embeddedKafka.getBrokersAsString()); - } @EnableAutoConfiguration @@ -294,4 +295,4 @@ public class KafkaStreamsInteractiveQueryIntegrationTests { } -} \ No newline at end of file +}