GH:851 Kafka Streams topology visualization

Add Boot actuator endpoints for topology visualization.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/851
This commit is contained in:
Soby Chacko
2020-03-06 16:32:25 -05:00
committed by Christian Tzolov
parent db896532e6
commit cc5d1b1aa6
6 changed files with 157 additions and 3 deletions

View File

@@ -1448,6 +1448,19 @@ By default, the `Kafkastreams.cleanup()` method is called when the binding is st
See https://docs.spring.io/spring-kafka/reference/html/_reference.html#_configuration[the Spring Kafka documentation].
To modify this behavior simply add a single `CleanupConfig` `@Bean` (configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.
=== Kafka Streams topology visualization
Kafka Streams binder provides the following actuator endpoints for retrieving the topology description using which you can visualize the topology using external tools.
`/actuator/topology`
`/actuator/topology/<applicaiton-id of the processor>`
You need to include the actuator and web dependencies from Spring Boot to access these endpoints.
Further, you also need to add `topology` to `management.endpoints.web.exposure.include` property.
By default, the `topology` endpoint is disabled.
=== Configuration Options
This section contains the configuration options used by the Kafka Streams binder.

View File

@@ -16,12 +16,16 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
@@ -31,7 +35,7 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean;
*
* @author Soby Chacko
*/
class KafkaStreamsRegistry {
public class KafkaStreamsRegistry {
private Map<KafkaStreams, StreamsBuilderFactoryBean> streamsBuilderFactoryBeanMap = new HashMap<>();
@@ -60,4 +64,18 @@ class KafkaStreamsRegistry {
return this.streamsBuilderFactoryBeanMap.get(kafkaStreams);
}
public StreamsBuilderFactoryBean streamsBuilderFactoryBean(String applicationId) {
final Optional<StreamsBuilderFactoryBean> first = this.streamsBuilderFactoryBeanMap.values()
.stream()
.filter(streamsBuilderFactoryBean -> streamsBuilderFactoryBean
.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals(applicationId))
.findFirst();
return first.orElse(null);
}
public List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans() {
return new ArrayList<>(this.streamsBuilderFactoryBeanMap.values());
}
}

View File

@@ -0,0 +1,71 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.endpoint;
import java.util.List;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.boot.actuate.endpoint.annotation.Selector;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.util.StringUtils;
/**
* Actuator endpoint for topology description.
*
* @author Soby Chacko
* @since 3.0.4
*/
@Endpoint(id = "topology")
public class TopologyEndpoint {
/**
* Topology not found message.
*/
public static final String NO_TOPOLOGY_FOUND_MSG = "No topology found for the given application ID";
private final KafkaStreamsRegistry kafkaStreamsRegistry;
public TopologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
}
@ReadOperation
public String topology() {
final List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsRegistry.streamsBuilderFactoryBeans();
final StringBuilder topologyDescription = new StringBuilder();
streamsBuilderFactoryBeans.stream()
.forEach(streamsBuilderFactoryBean ->
topologyDescription.append(streamsBuilderFactoryBean.getTopology().describe().toString()));
return topologyDescription.toString();
}
@ReadOperation
public String topology(@Selector String applicationId) {
if (!StringUtils.isEmpty(applicationId)) {
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamsBuilderFactoryBean(applicationId);
if (streamsBuilderFactoryBean != null) {
return streamsBuilderFactoryBean.getTopology().describe().toString();
}
else {
return NO_TOPOLOGY_FOUND_MSG;
}
}
return NO_TOPOLOGY_FOUND_MSG;
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.endpoint;
import org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration;
import org.springframework.boot.actuate.autoconfigure.endpoint.condition.ConditionalOnAvailableEndpoint;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Soby Chacko
* @since 3.0.4
*/
@Configuration
@ConditionalOnClass(name = {
"org.springframework.boot.actuate.endpoint.annotation.Endpoint" })
@AutoConfigureAfter({EndpointAutoConfiguration.class, KafkaStreamsBinderSupportAutoConfiguration.class})
public class TopologyEndpointAutoConfiguration {
@Bean
@ConditionalOnAvailableEndpoint
public TopologyEndpoint topologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
return new TopologyEndpoint(kafkaStreamsRegistry);
}
}

View File

@@ -1,4 +1,4 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration,\
org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpointAutoConfiguration

View File

@@ -44,6 +44,8 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpoint;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
@@ -108,6 +110,13 @@ public class KafkaStreamsBinderWordCountFunctionTests {
assertThat(meterRegistry.get("stream.metrics.commit.total").gauge().value()).isEqualTo(1.0);
assertThat(meterRegistry.get("app.info.start.time.ms").gauge().value()).isNotNaN();
Assert.isTrue(LATCH.await(5, TimeUnit.SECONDS), "Failed to call customizers");
//Testing topology endpoint
final KafkaStreamsRegistry kafkaStreamsRegistry = context.getBean(KafkaStreamsRegistry.class);
final TopologyEndpoint topologyEndpoint = new TopologyEndpoint(kafkaStreamsRegistry);
final String topology1 = topologyEndpoint.topology();
final String topology2 = topologyEndpoint.topology("testKstreamWordCountFunction");
assertThat(topology1).isNotEmpty();
assertThat(topology1).isEqualTo(topology2);
}
}