Renaming Kafka Streams topology actuator endpoint
Kafka Streams topology actuator endpoint had a conflict with the JMX exporter and was causing some IDE issues. Renming this endpoint to kafkastreamstopology. Renaming the underlying methods in this actuator endpoint implentation. Updating docs. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/895
This commit is contained in:
@@ -1453,13 +1453,13 @@ To modify this behavior simply add a single `CleanupConfig` `@Bean` (configured
|
||||
|
||||
Kafka Streams binder provides the following actuator endpoints for retrieving the topology description using which you can visualize the topology using external tools.
|
||||
|
||||
`/actuator/topology`
|
||||
`/actuator/kafkastreamstopology`
|
||||
|
||||
`/actuator/topology/<applicaiton-id of the processor>`
|
||||
`/actuator/kafkastreamstopology/<applicaiton-id of the processor>`
|
||||
|
||||
You need to include the actuator and web dependencies from Spring Boot to access these endpoints.
|
||||
Further, you also need to add `topology` to `management.endpoints.web.exposure.include` property.
|
||||
By default, the `topology` endpoint is disabled.
|
||||
Further, you also need to add `kafkastreamstopology` to `management.endpoints.web.exposure.include` property.
|
||||
By default, the `kafkastreamstopology` endpoint is disabled.
|
||||
|
||||
=== Configuration Options
|
||||
|
||||
|
||||
@@ -31,8 +31,8 @@ import org.springframework.util.StringUtils;
|
||||
* @author Soby Chacko
|
||||
* @since 3.0.4
|
||||
*/
|
||||
@Endpoint(id = "topology")
|
||||
public class TopologyEndpoint {
|
||||
@Endpoint(id = "kafkastreamstopology")
|
||||
public class KafkaStreamsTopologyEndpoint {
|
||||
|
||||
/**
|
||||
* Topology not found message.
|
||||
@@ -41,12 +41,12 @@ public class TopologyEndpoint {
|
||||
|
||||
private final KafkaStreamsRegistry kafkaStreamsRegistry;
|
||||
|
||||
public TopologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
public KafkaStreamsTopologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
|
||||
}
|
||||
|
||||
@ReadOperation
|
||||
public String topology() {
|
||||
public String kafkaStreamsTopology() {
|
||||
final List<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsRegistry.streamsBuilderFactoryBeans();
|
||||
final StringBuilder topologyDescription = new StringBuilder();
|
||||
streamsBuilderFactoryBeans.stream()
|
||||
@@ -56,7 +56,7 @@ public class TopologyEndpoint {
|
||||
}
|
||||
|
||||
@ReadOperation
|
||||
public String topology(@Selector String applicationId) {
|
||||
public String kafkaStreamsTopology(@Selector String applicationId) {
|
||||
if (!StringUtils.isEmpty(applicationId)) {
|
||||
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.kafkaStreamsRegistry.streamsBuilderFactoryBean(applicationId);
|
||||
if (streamsBuilderFactoryBean != null) {
|
||||
@@ -33,11 +33,11 @@ import org.springframework.context.annotation.Configuration;
|
||||
@ConditionalOnClass(name = {
|
||||
"org.springframework.boot.actuate.endpoint.annotation.Endpoint" })
|
||||
@AutoConfigureAfter({EndpointAutoConfiguration.class, KafkaStreamsBinderSupportAutoConfiguration.class})
|
||||
public class TopologyEndpointAutoConfiguration {
|
||||
public class KafkaStreamsTopologyEndpointAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnAvailableEndpoint
|
||||
public TopologyEndpoint topologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
return new TopologyEndpoint(kafkaStreamsRegistry);
|
||||
public KafkaStreamsTopologyEndpoint topologyEndpoint(KafkaStreamsRegistry kafkaStreamsRegistry) {
|
||||
return new KafkaStreamsTopologyEndpoint(kafkaStreamsRegistry);
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderSupportAutoConfiguration,\
|
||||
org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsFunctionAutoConfiguration,\
|
||||
org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpointAutoConfiguration
|
||||
org.springframework.cloud.stream.binder.kafka.streams.endpoint.KafkaStreamsTopologyEndpointAutoConfiguration
|
||||
|
||||
@@ -45,7 +45,7 @@ import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.endpoint.TopologyEndpoint;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.endpoint.KafkaStreamsTopologyEndpoint;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
|
||||
@@ -112,9 +112,9 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
Assert.isTrue(LATCH.await(5, TimeUnit.SECONDS), "Failed to call customizers");
|
||||
//Testing topology endpoint
|
||||
final KafkaStreamsRegistry kafkaStreamsRegistry = context.getBean(KafkaStreamsRegistry.class);
|
||||
final TopologyEndpoint topologyEndpoint = new TopologyEndpoint(kafkaStreamsRegistry);
|
||||
final String topology1 = topologyEndpoint.topology();
|
||||
final String topology2 = topologyEndpoint.topology("testKstreamWordCountFunction");
|
||||
final KafkaStreamsTopologyEndpoint kafkaStreamsTopologyEndpoint = new KafkaStreamsTopologyEndpoint(kafkaStreamsRegistry);
|
||||
final String topology1 = kafkaStreamsTopologyEndpoint.kafkaStreamsTopology();
|
||||
final String topology2 = kafkaStreamsTopologyEndpoint.kafkaStreamsTopology("testKstreamWordCountFunction");
|
||||
assertThat(topology1).isNotEmpty();
|
||||
assertThat(topology1).isEqualTo(topology2);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user