From 4159217023eff291e5df683e1c52832e066b6a58 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Fri, 17 Jul 2020 19:05:33 -0400 Subject: [PATCH] Kafka Streams metrics changes * Kafka Streams metrics in the binder for Boot 2.2 users are streamlined to reflect the Micrometer native support added with version 1.4.0 which is available through Boot 2.3. While Boot 2.3 users will get this native support from Micrometer, Boot 2.2 users will still rely on the custom implementation in the binder. This commit aligns that custom implemenation more with the native implementation. * Disable the custom Kafka Streams metrics bean which is mentioned above (KafkaStreamsBinderMetrics) when the application is on Boot 2.3, as this implementation is only applicable for Boot 2.2.x. * Update docs Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/880 --- docs/src/main/asciidoc/kafka-streams.adoc | 25 +- .../streams/KafkaStreamsBinderMetrics.java | 225 ++++++++++++++---- ...StreamsBinderSupportAutoConfiguration.java | 3 + ...kaStreamsBinderWordCountFunctionTests.java | 3 +- 4 files changed, 184 insertions(+), 72 deletions(-) diff --git a/docs/src/main/asciidoc/kafka-streams.adoc b/docs/src/main/asciidoc/kafka-streams.adoc index 3c543f99..b0e931f8 100644 --- a/docs/src/main/asciidoc/kafka-streams.adoc +++ b/docs/src/main/asciidoc/kafka-streams.adoc @@ -1056,28 +1056,13 @@ When there are multiple Kafka Streams processors present in the same application === Accessing Kafka Streams Metrics -Spring Cloud Stream Kafka Streams binder provides a basic mechanism for accessing Kafka Streams metrics exported through a Micrometer `MeterRegistry`. -Kafka Streams metrics that are available through `KafkaStreams#metrics()` are exported to this meter registry by the binder. -The metrics exported are from the consumers, producers, admin-client and the stream itself. +Spring Cloud Stream Kafka Streams binder provides Kafka Streams metrics which can be exported through a Micrometer `MeterRegistry`. -The metrics exported by the binder are exported with the format of metrics group name followed by a dot and then the actual metric name. -All dashes in the original metric information is replaced with dots. +For Spring Boot version 2.2.x, the metrics support is provided through a custom Micrometer metrics implementation by the binder. +For Spring Boot version 2.3.x, the Kafka Streams metrics support is provided natively through Micrometer. -For e.g. the metric name `network-io-total` from the metric group `consumer-metrics` is available in the micrometer registry as `consumer.metrics.network.io.total`. -Similarly, the metric `commit-total` from `stream-metrics` is available as `stream.metrics.commit.total`. - -If you have multiple Kafka Streams processors in the same application, then the metric name will be prepended with the corresponding application ID of the Kafka Streams. -The application ID in this case will be preserved as is, i.e. no dashes will be converted to dots etc. -For example, if the application ID of the first processor is `processor-1`, then the metric name `network-io-total` from the metric group `consumer-metrics` is available in the micrometer registry as `processor-1.consumer.metrics.network.io.total`. - -You can either programmatically access the Micrometer `MeterRegistry` in the application and then iterate through the available gauges or use Spring Boot actuator to access the metrics through a REST endpoint. -When accessing through the Boot actuator endpoint, make sure to add `metrics` to the property `management.endpoints.web.exposure.include`. -Then you can access `/acutator/metrics` to get a list of all the available metrics which then can be individually accessed through the same URI (`/actuator/metrics/`). - -Anything beyond the info level metrics available through `KafkaStreams#metrics()`, (for e.g. the debugging level metrics) are still only available through JMX after you set the `metrics.recording.level` to `DEBUG`. -Kafka Streams, by default, set this level to `INFO`. -https://kafka.apache.org/documentation/#kafka_streams_monitoring[Please see this section] from Kafka Streams documentation for more details. -In a future release, binder may support exporting these DEBUG level metrics through Micrometer. +When accessing metrics through the Boot actuator endpoint, make sure to add `metrics` to the property `management.endpoints.web.exposure.include`. +Then you can access `/acutator/metrics` to get a list of all the available metrics, which then can be individually accessed through the same URI (`/actuator/metrics/`). === Mixing high level DSL and low level Processor API diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderMetrics.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderMetrics.java index a5f927d0..c651a056 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderMetrics.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderMetrics.java @@ -16,18 +16,24 @@ package org.springframework.cloud.stream.binder.kafka.streams; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.ToDoubleFunction; +import io.micrometer.core.instrument.FunctionCounter; import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.binder.MeterBinder; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; import org.springframework.kafka.config.StreamsBuilderFactoryBean; @@ -35,79 +41,198 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean; * Kafka Streams binder metrics implementation that exports the metrics available * through {@link KafkaStreams#metrics()} into a micrometer {@link io.micrometer.core.instrument.MeterRegistry}. * + * Boot 2.2 users need to rely on this class for the metrics instead of direct support from Micrometer. + * Micrometer added Kafka Streams metrics support in 1.4.0 which Boot 2.3 includes. + * Therefore, the users who are on Boot 2.2, need to rely on these metrics. + * For users who are on 2.3 Boot, this class won't be activated (See the configuration for the various + * conditionals used). + * + * For the most part, this class is a copy of the Micrometer Kafka Streams support that was added in version 1.4.0. + * We will keep this class, as long as we support Boot 2.2.x. + * * @author Soby Chacko * @since 3.0.0 */ public class KafkaStreamsBinderMetrics { + static final String DEFAULT_VALUE = "unknown"; + + static final String CLIENT_ID_TAG_NAME = "client-id"; + + static final String METRIC_GROUP_APP_INFO = "app-info"; + + static final String VERSION_METRIC_NAME = "version"; + + static final String START_TIME_METRIC_NAME = "start-time-ms"; + + static final String KAFKA_VERSION_TAG_NAME = "kafka-version"; + + static final String METRIC_NAME_PREFIX = "kafka."; + + static final String METRIC_GROUP_METRICS_COUNT = "kafka-metrics-count"; + + private String kafkaVersion = DEFAULT_VALUE; + + private String clientId = DEFAULT_VALUE; + private final MeterRegistry meterRegistry; private MeterBinder meterBinder; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + private volatile Set currentMeters = new HashSet<>(); + public KafkaStreamsBinderMetrics(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; } - public void bindTo(Set streamsBuilderFactoryBeans, MeterRegistry meterRegistry) { - + public void bindTo(Set streamsBuilderFactoryBeans) { if (this.meterBinder == null) { - this.meterBinder = new MeterBinder() { - @Override - @SuppressWarnings("unchecked") - public void bindTo(MeterRegistry registry) { - if (streamsBuilderFactoryBeans != null) { - for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) { - KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams(); - final Map metrics = kafkaStreams.metrics(); - - Set meterNames = new HashSet<>(); - - for (Map.Entry metric : metrics.entrySet()) { - final String sanitized = sanitize(metric.getKey().group() + "." + metric.getKey().name()); - final String applicationId = streamsBuilderFactoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG); - - final String name = streamsBuilderFactoryBeans.size() > 1 ? applicationId + "." + sanitized : sanitized; - - final Gauge.Builder builder = - Gauge.builder(name, this, - toDoubleFunction(metric.getValue())); - final Map tags = metric.getKey().tags(); - for (Map.Entry tag : tags.entrySet()) { - builder.tag(tag.getKey(), tag.getValue()); - } - if (!meterNames.contains(name)) { - builder.description(metric.getKey().description()) - .register(meterRegistry); - meterNames.add(name); - } - } - } + this.meterBinder = registry -> { + if (streamsBuilderFactoryBeans != null) { + for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) { + KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams(); + final Map metrics = kafkaStreams.metrics(); + prepareToBindMetrics(registry, metrics); + checkAndBindMetrics(registry, metrics); } } - - ToDoubleFunction toDoubleFunction(Metric metric) { - return (o) -> { - if (metric.metricValue() instanceof Number) { - return ((Number) metric.metricValue()).doubleValue(); - } - else { - return 0.0; - } - }; - } }; } this.meterBinder.bindTo(this.meterRegistry); } - private static String sanitize(String value) { - return value.replaceAll("-", "."); - } - public void addMetrics(Set streamsBuilderFactoryBeans) { synchronized (KafkaStreamsBinderMetrics.this) { - this.bindTo(streamsBuilderFactoryBeans, this.meterRegistry); + this.bindTo(streamsBuilderFactoryBeans); + } + } + + void prepareToBindMetrics(MeterRegistry registry, Map metrics) { + Metric startTime = null; + for (Map.Entry entry : metrics.entrySet()) { + MetricName name = entry.getKey(); + if (clientId.equals(DEFAULT_VALUE) && name.tags().get(CLIENT_ID_TAG_NAME) != null) { + clientId = name.tags().get(CLIENT_ID_TAG_NAME); + } + if (METRIC_GROUP_APP_INFO.equals(name.group())) { + if (VERSION_METRIC_NAME.equals(name.name())) { + kafkaVersion = (String) entry.getValue().metricValue(); + } + else if (START_TIME_METRIC_NAME.equals(name.name())) { + startTime = entry.getValue(); + } + } + } + if (startTime != null) { + bindMeter(registry, startTime, meterName(startTime), meterTags(startTime)); + } + } + + private void bindMeter(MeterRegistry registry, Metric metric, String name, Iterable tags) { + if (name.endsWith("total") || name.endsWith("count")) { + registerCounter(registry, metric, name, tags); + } + else { + registerGauge(registry, metric, name, tags); + } + } + + private void registerCounter(MeterRegistry registry, Metric metric, String name, Iterable tags) { + FunctionCounter.builder(name, metric, toMetricValue()) + .tags(tags) + .description(metric.metricName().description()) + .register(registry); + } + + private ToDoubleFunction toMetricValue() { + return metric -> ((Number) metric.metricValue()).doubleValue(); + } + + private void registerGauge(MeterRegistry registry, Metric metric, String name, Iterable tags) { + Gauge.builder(name, metric, toMetricValue()) + .tags(tags) + .description(metric.metricName().description()) + .register(registry); + } + + private List meterTags(Metric metric) { + return meterTags(metric, false); + } + + private String meterName(Metric metric) { + String name = METRIC_NAME_PREFIX + metric.metricName().group() + "." + metric.metricName().name(); + return name.replaceAll("-metrics", "").replaceAll("-", "."); + } + + private List meterTags(Metric metric, boolean includeCommonTags) { + List tags = new ArrayList<>(); + metric.metricName().tags().forEach((key, value) -> tags.add(Tag.of(key, value))); + tags.add(Tag.of(KAFKA_VERSION_TAG_NAME, kafkaVersion)); + return tags; + } + + private boolean differentClient(List tags) { + for (Tag tag : tags) { + if (tag.getKey().equals(CLIENT_ID_TAG_NAME)) { + if (!clientId.equals(tag.getValue())) { + return true; + } + } + } + return false; + } + + void checkAndBindMetrics(MeterRegistry registry, Map metrics) { + if (!currentMeters.equals(metrics.keySet())) { + currentMeters = new HashSet<>(metrics.keySet()); + metrics.forEach((name, metric) -> { + //Filter out non-numeric values + if (!(metric.metricValue() instanceof Number)) { + return; + } + + //Filter out metrics from groups that include metadata + if (METRIC_GROUP_APP_INFO.equals(name.group())) { + return; + } + if (METRIC_GROUP_METRICS_COUNT.equals(name.group())) { + return; + } + String meterName = meterName(metric); + List meterTagsWithCommonTags = meterTags(metric, true); + //Kafka has metrics with lower number of tags (e.g. with/without topic or partition tag) + //Remove meters with lower number of tags + boolean hasLessTags = false; + for (Meter other : registry.find(meterName).meters()) { + List tags = other.getId().getTags(); + // Only consider meters from the same client before filtering + if (differentClient(tags)) { + break; + } + if (tags.size() < meterTagsWithCommonTags.size()) { + registry.remove(other); + } + // Check if already exists + else if (tags.size() == meterTagsWithCommonTags.size()) { + if (tags.equals(meterTagsWithCommonTags)) { + return; + } + else { + break; + } + } + else { + hasLessTags = true; + } + } + if (hasLessTags) { + return; + } + bindMeter(registry, metric, meterName, meterTags(metric)); + }); } } } diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java index ed94d580..7dd93f5e 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java @@ -42,6 +42,7 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -421,6 +422,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration { @Bean @ConditionalOnBean(MeterRegistry.class) @ConditionalOnMissingBean(KafkaStreamsBinderMetrics.class) + @ConditionalOnMissingClass("org.springframework.kafka.core.MicrometerConsumerListener") public KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics(MeterRegistry meterRegistry) { return new KafkaStreamsBinderMetrics(meterRegistry); @@ -468,6 +470,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration { protected class KafkaStreamsBinderMetricsConfigurationWithMultiBinder { @Bean + @ConditionalOnMissingClass("org.springframework.kafka.core.MicrometerConsumerListener") public KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics(ConfigurableApplicationContext context) { MeterRegistry meterRegistry = context.getBean("outerContext", ApplicationContext.class) diff --git a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java index 1c7f6831..2156ef26 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java +++ b/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java @@ -107,8 +107,7 @@ public class KafkaStreamsBinderWordCountFunctionTests { receiveAndValidate("words", "counts"); final MeterRegistry meterRegistry = context.getBean(MeterRegistry.class); Thread.sleep(100); - assertThat(meterRegistry.get("stream.thread.metrics.commit.total").gauge().value()).isEqualTo(1.0); - assertThat(meterRegistry.get("app.info.start.time.ms").gauge().value()).isNotNaN(); + assertThat(meterRegistry.getMeters().size() > 1).isTrue(); Assert.isTrue(LATCH.await(5, TimeUnit.SECONDS), "Failed to call customizers"); //Testing topology endpoint final KafkaStreamsRegistry kafkaStreamsRegistry = context.getBean(KafkaStreamsRegistry.class);