Kafka Streams metrics changes

* Kafka Streams metrics in the binder for Boot 2.2 users are streamlined to
  reflect the Micrometer native support added with version 1.4.0 which is
  available through Boot 2.3. While Boot 2.3 users will get this native support
  from  Micrometer, Boot 2.2 users will still rely on the custom implementation
  in the binder. This commit aligns that custom implemenation more with
  the native implementation.

* Disable the custom Kafka Streams metrics bean which is mentioned above
  (KafkaStreamsBinderMetrics) when the application is on Boot 2.3, as this
  implementation is only applicable for Boot 2.2.x.

* Update docs

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/880
This commit is contained in:
Soby Chacko
2020-07-17 19:05:33 -04:00
parent 087b09d193
commit 4159217023
4 changed files with 184 additions and 72 deletions

View File

@@ -1056,28 +1056,13 @@ When there are multiple Kafka Streams processors present in the same application
=== Accessing Kafka Streams Metrics
Spring Cloud Stream Kafka Streams binder provides a basic mechanism for accessing Kafka Streams metrics exported through a Micrometer `MeterRegistry`.
Kafka Streams metrics that are available through `KafkaStreams#metrics()` are exported to this meter registry by the binder.
The metrics exported are from the consumers, producers, admin-client and the stream itself.
Spring Cloud Stream Kafka Streams binder provides Kafka Streams metrics which can be exported through a Micrometer `MeterRegistry`.
The metrics exported by the binder are exported with the format of metrics group name followed by a dot and then the actual metric name.
All dashes in the original metric information is replaced with dots.
For Spring Boot version 2.2.x, the metrics support is provided through a custom Micrometer metrics implementation by the binder.
For Spring Boot version 2.3.x, the Kafka Streams metrics support is provided natively through Micrometer.
For e.g. the metric name `network-io-total` from the metric group `consumer-metrics` is available in the micrometer registry as `consumer.metrics.network.io.total`.
Similarly, the metric `commit-total` from `stream-metrics` is available as `stream.metrics.commit.total`.
If you have multiple Kafka Streams processors in the same application, then the metric name will be prepended with the corresponding application ID of the Kafka Streams.
The application ID in this case will be preserved as is, i.e. no dashes will be converted to dots etc.
For example, if the application ID of the first processor is `processor-1`, then the metric name `network-io-total` from the metric group `consumer-metrics` is available in the micrometer registry as `processor-1.consumer.metrics.network.io.total`.
You can either programmatically access the Micrometer `MeterRegistry` in the application and then iterate through the available gauges or use Spring Boot actuator to access the metrics through a REST endpoint.
When accessing through the Boot actuator endpoint, make sure to add `metrics` to the property `management.endpoints.web.exposure.include`.
Then you can access `/acutator/metrics` to get a list of all the available metrics which then can be individually accessed through the same URI (`/actuator/metrics/<metric-name>`).
Anything beyond the info level metrics available through `KafkaStreams#metrics()`, (for e.g. the debugging level metrics) are still only available through JMX after you set the `metrics.recording.level` to `DEBUG`.
Kafka Streams, by default, set this level to `INFO`.
https://kafka.apache.org/documentation/#kafka_streams_monitoring[Please see this section] from Kafka Streams documentation for more details.
In a future release, binder may support exporting these DEBUG level metrics through Micrometer.
When accessing metrics through the Boot actuator endpoint, make sure to add `metrics` to the property `management.endpoints.web.exposure.include`.
Then you can access `/acutator/metrics` to get a list of all the available metrics, which then can be individually accessed through the same URI (`/actuator/metrics/<metric-name>`).
=== Mixing high level DSL and low level Processor API

View File

@@ -16,18 +16,24 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.ToDoubleFunction;
import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
@@ -35,79 +41,198 @@ import org.springframework.kafka.config.StreamsBuilderFactoryBean;
* Kafka Streams binder metrics implementation that exports the metrics available
* through {@link KafkaStreams#metrics()} into a micrometer {@link io.micrometer.core.instrument.MeterRegistry}.
*
* Boot 2.2 users need to rely on this class for the metrics instead of direct support from Micrometer.
* Micrometer added Kafka Streams metrics support in 1.4.0 which Boot 2.3 includes.
* Therefore, the users who are on Boot 2.2, need to rely on these metrics.
* For users who are on 2.3 Boot, this class won't be activated (See the configuration for the various
* conditionals used).
*
* For the most part, this class is a copy of the Micrometer Kafka Streams support that was added in version 1.4.0.
* We will keep this class, as long as we support Boot 2.2.x.
*
* @author Soby Chacko
* @since 3.0.0
*/
public class KafkaStreamsBinderMetrics {
static final String DEFAULT_VALUE = "unknown";
static final String CLIENT_ID_TAG_NAME = "client-id";
static final String METRIC_GROUP_APP_INFO = "app-info";
static final String VERSION_METRIC_NAME = "version";
static final String START_TIME_METRIC_NAME = "start-time-ms";
static final String KAFKA_VERSION_TAG_NAME = "kafka-version";
static final String METRIC_NAME_PREFIX = "kafka.";
static final String METRIC_GROUP_METRICS_COUNT = "kafka-metrics-count";
private String kafkaVersion = DEFAULT_VALUE;
private String clientId = DEFAULT_VALUE;
private final MeterRegistry meterRegistry;
private MeterBinder meterBinder;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private volatile Set<MetricName> currentMeters = new HashSet<>();
public KafkaStreamsBinderMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
public void bindTo(Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans, MeterRegistry meterRegistry) {
public void bindTo(Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans) {
if (this.meterBinder == null) {
this.meterBinder = new MeterBinder() {
@Override
@SuppressWarnings("unchecked")
public void bindTo(MeterRegistry registry) {
if (streamsBuilderFactoryBeans != null) {
for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) {
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
final Map<MetricName, ? extends Metric> metrics = kafkaStreams.metrics();
Set<String> meterNames = new HashSet<>();
for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
final String sanitized = sanitize(metric.getKey().group() + "." + metric.getKey().name());
final String applicationId = streamsBuilderFactoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
final String name = streamsBuilderFactoryBeans.size() > 1 ? applicationId + "." + sanitized : sanitized;
final Gauge.Builder<KafkaStreamsBinderMetrics> builder =
Gauge.builder(name, this,
toDoubleFunction(metric.getValue()));
final Map<String, String> tags = metric.getKey().tags();
for (Map.Entry<String, String> tag : tags.entrySet()) {
builder.tag(tag.getKey(), tag.getValue());
}
if (!meterNames.contains(name)) {
builder.description(metric.getKey().description())
.register(meterRegistry);
meterNames.add(name);
}
}
}
this.meterBinder = registry -> {
if (streamsBuilderFactoryBeans != null) {
for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) {
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
final Map<MetricName, ? extends Metric> metrics = kafkaStreams.metrics();
prepareToBindMetrics(registry, metrics);
checkAndBindMetrics(registry, metrics);
}
}
ToDoubleFunction toDoubleFunction(Metric metric) {
return (o) -> {
if (metric.metricValue() instanceof Number) {
return ((Number) metric.metricValue()).doubleValue();
}
else {
return 0.0;
}
};
}
};
}
this.meterBinder.bindTo(this.meterRegistry);
}
private static String sanitize(String value) {
return value.replaceAll("-", ".");
}
public void addMetrics(Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans) {
synchronized (KafkaStreamsBinderMetrics.this) {
this.bindTo(streamsBuilderFactoryBeans, this.meterRegistry);
this.bindTo(streamsBuilderFactoryBeans);
}
}
void prepareToBindMetrics(MeterRegistry registry, Map<MetricName, ? extends Metric> metrics) {
Metric startTime = null;
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName name = entry.getKey();
if (clientId.equals(DEFAULT_VALUE) && name.tags().get(CLIENT_ID_TAG_NAME) != null) {
clientId = name.tags().get(CLIENT_ID_TAG_NAME);
}
if (METRIC_GROUP_APP_INFO.equals(name.group())) {
if (VERSION_METRIC_NAME.equals(name.name())) {
kafkaVersion = (String) entry.getValue().metricValue();
}
else if (START_TIME_METRIC_NAME.equals(name.name())) {
startTime = entry.getValue();
}
}
}
if (startTime != null) {
bindMeter(registry, startTime, meterName(startTime), meterTags(startTime));
}
}
private void bindMeter(MeterRegistry registry, Metric metric, String name, Iterable<Tag> tags) {
if (name.endsWith("total") || name.endsWith("count")) {
registerCounter(registry, metric, name, tags);
}
else {
registerGauge(registry, metric, name, tags);
}
}
private void registerCounter(MeterRegistry registry, Metric metric, String name, Iterable<Tag> tags) {
FunctionCounter.builder(name, metric, toMetricValue())
.tags(tags)
.description(metric.metricName().description())
.register(registry);
}
private ToDoubleFunction<Metric> toMetricValue() {
return metric -> ((Number) metric.metricValue()).doubleValue();
}
private void registerGauge(MeterRegistry registry, Metric metric, String name, Iterable<Tag> tags) {
Gauge.builder(name, metric, toMetricValue())
.tags(tags)
.description(metric.metricName().description())
.register(registry);
}
private List<Tag> meterTags(Metric metric) {
return meterTags(metric, false);
}
private String meterName(Metric metric) {
String name = METRIC_NAME_PREFIX + metric.metricName().group() + "." + metric.metricName().name();
return name.replaceAll("-metrics", "").replaceAll("-", ".");
}
private List<Tag> meterTags(Metric metric, boolean includeCommonTags) {
List<Tag> tags = new ArrayList<>();
metric.metricName().tags().forEach((key, value) -> tags.add(Tag.of(key, value)));
tags.add(Tag.of(KAFKA_VERSION_TAG_NAME, kafkaVersion));
return tags;
}
private boolean differentClient(List<Tag> tags) {
for (Tag tag : tags) {
if (tag.getKey().equals(CLIENT_ID_TAG_NAME)) {
if (!clientId.equals(tag.getValue())) {
return true;
}
}
}
return false;
}
void checkAndBindMetrics(MeterRegistry registry, Map<MetricName, ? extends Metric> metrics) {
if (!currentMeters.equals(metrics.keySet())) {
currentMeters = new HashSet<>(metrics.keySet());
metrics.forEach((name, metric) -> {
//Filter out non-numeric values
if (!(metric.metricValue() instanceof Number)) {
return;
}
//Filter out metrics from groups that include metadata
if (METRIC_GROUP_APP_INFO.equals(name.group())) {
return;
}
if (METRIC_GROUP_METRICS_COUNT.equals(name.group())) {
return;
}
String meterName = meterName(metric);
List<Tag> meterTagsWithCommonTags = meterTags(metric, true);
//Kafka has metrics with lower number of tags (e.g. with/without topic or partition tag)
//Remove meters with lower number of tags
boolean hasLessTags = false;
for (Meter other : registry.find(meterName).meters()) {
List<Tag> tags = other.getId().getTags();
// Only consider meters from the same client before filtering
if (differentClient(tags)) {
break;
}
if (tags.size() < meterTagsWithCommonTags.size()) {
registry.remove(other);
}
// Check if already exists
else if (tags.size() == meterTagsWithCommonTags.size()) {
if (tags.equals(meterTagsWithCommonTags)) {
return;
}
else {
break;
}
}
else {
hasLessTags = true;
}
}
if (hasLessTags) {
return;
}
bindMeter(registry, metric, meterName, meterTags(metric));
});
}
}
}

View File

@@ -42,6 +42,7 @@ import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -421,6 +422,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
@Bean
@ConditionalOnBean(MeterRegistry.class)
@ConditionalOnMissingBean(KafkaStreamsBinderMetrics.class)
@ConditionalOnMissingClass("org.springframework.kafka.core.MicrometerConsumerListener")
public KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics(MeterRegistry meterRegistry) {
return new KafkaStreamsBinderMetrics(meterRegistry);
@@ -468,6 +470,7 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
protected class KafkaStreamsBinderMetricsConfigurationWithMultiBinder {
@Bean
@ConditionalOnMissingClass("org.springframework.kafka.core.MicrometerConsumerListener")
public KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics(ConfigurableApplicationContext context) {
MeterRegistry meterRegistry = context.getBean("outerContext", ApplicationContext.class)

View File

@@ -107,8 +107,7 @@ public class KafkaStreamsBinderWordCountFunctionTests {
receiveAndValidate("words", "counts");
final MeterRegistry meterRegistry = context.getBean(MeterRegistry.class);
Thread.sleep(100);
assertThat(meterRegistry.get("stream.thread.metrics.commit.total").gauge().value()).isEqualTo(1.0);
assertThat(meterRegistry.get("app.info.start.time.ms").gauge().value()).isNotNaN();
assertThat(meterRegistry.getMeters().size() > 1).isTrue();
Assert.isTrue(LATCH.await(5, TimeUnit.SECONDS), "Failed to call customizers");
//Testing topology endpoint
final KafkaStreamsRegistry kafkaStreamsRegistry = context.getBean(KafkaStreamsRegistry.class);