GH-1129: Kafka Binder Metrics Improvements

Avoid blocking committed() call in KafkaBinderMetrics in a loop for
each topic partition.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1129
This commit is contained in:
Soby Chacko
2021-08-18 19:47:27 -04:00
committed by Gary Russell
parent 739b499966
commit ba2c3a05c9
2 changed files with 17 additions and 8 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -208,10 +208,11 @@ public class KafkaBinderMetrics
Map<TopicPartition, Long> endOffsets = metadataConsumer
.endOffsets(topicPartitions);
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = metadataConsumer.committed(endOffsets.keySet());
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets
.entrySet()) {
OffsetAndMetadata current = metadataConsumer
.committed(endOffset.getKey());
OffsetAndMetadata current = committedOffsets.get(endOffset.getKey());
lag += endOffset.getValue();
if (current != null) {
lag -= current.offset();

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -89,9 +89,12 @@ public class KafkaBinderMetricsTest {
@Test
public void shouldIndicateLag() {
final Map<TopicPartition, OffsetAndMetadata> committed = new HashMap<>();
TopicPartition topicPartition = new TopicPartition(TEST_TOPIC, 0);
committed.put(topicPartition, new OffsetAndMetadata(500));
org.mockito.BDDMockito
.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class)))
.willReturn(new OffsetAndMetadata(500));
.given(consumer.committed(ArgumentMatchers.anySet()))
.willReturn(committed);
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC,
new TopicInformation("group1-metrics", partitions, false));
@@ -133,9 +136,14 @@ public class KafkaBinderMetricsTest {
org.mockito.BDDMockito
.given(consumer.endOffsets(ArgumentMatchers.anyCollection()))
.willReturn(endOffsets);
final Map<TopicPartition, OffsetAndMetadata> committed = new HashMap<>();
TopicPartition topicPartition1 = new TopicPartition(TEST_TOPIC, 0);
TopicPartition topicPartition2 = new TopicPartition(TEST_TOPIC, 1);
committed.put(topicPartition1, new OffsetAndMetadata(500));
committed.put(topicPartition2, new OffsetAndMetadata(500));
org.mockito.BDDMockito
.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class)))
.willReturn(new OffsetAndMetadata(500));
.given(consumer.committed(ArgumentMatchers.anySet()))
.willReturn(committed);
List<PartitionInfo> partitions = partitions(new Node(0, null, 0),
new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC,