GH-914: Add Micrometer Producer/Consumer Listeners

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/914

Boot no longer uses the deprecated JMX MBean scraping provided by Micrometer.

Add configuration to add Micrometer Meters when Micrometer and spring-kafka 2.5.x are on
the class path.

Micrometer for Streams

- work around until SK 2.5.3
This commit is contained in:
Gary Russell
2020-06-10 17:15:51 -04:00
committed by Soby Chacko
parent 38bea7e7da
commit 95bc54b991
8 changed files with 288 additions and 21 deletions

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -24,8 +25,12 @@ import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
@@ -381,8 +386,9 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
public StreamsBuilderFactoryManager streamsBuilderFactoryManager(
KafkaStreamsBindingInformationCatalogue catalogue,
KafkaStreamsRegistry kafkaStreamsRegistry,
@Nullable KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics) {
return new StreamsBuilderFactoryManager(catalogue, kafkaStreamsRegistry, kafkaStreamsBinderMetrics);
@Nullable KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics,
@Nullable StreamsListener listener) {
return new StreamsBuilderFactoryManager(catalogue, kafkaStreamsRegistry, kafkaStreamsBinderMetrics, listener);
}
@Bean
@@ -419,6 +425,40 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
return new KafkaStreamsBinderMetrics(meterRegistry);
}
@ConditionalOnClass(name = "org.springframework.kafka.core.MicrometerConsumerListener")
@ConditionalOnBean(MeterRegistry.class)
protected class KafkaMicrometer {
@Bean
@ConditionalOnMissingBean(name = "binderStreamsListener")
public StreamsListener binderStreamsListener(MeterRegistry meterRegistry) {
return new StreamsListener() {
private final Map<String, KafkaStreamsMetrics> metrics = new HashMap<>();
@Override
public synchronized void streamsAdded(String id, KafkaStreams kafkaStreams) {
if (!this.metrics.containsKey(id)) {
List<Tag> streamsTags = new ArrayList<>();
streamsTags.add(new ImmutableTag("spring.id", id));
this.metrics.put(id, new KafkaStreamsMetrics(kafkaStreams, streamsTags));
this.metrics.get(id).bindTo(meterRegistry);
}
}
@Override
public synchronized void streamsRemoved(String id, KafkaStreams streams) {
KafkaStreamsMetrics removed = this.metrics.remove(id);
if (removed != null) {
removed.close();
}
}
};
}
}
}
@Configuration
@@ -434,5 +474,41 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
.getBean(MeterRegistry.class);
return new KafkaStreamsBinderMetrics(meterRegistry);
}
@ConditionalOnClass(name = "org.springframework.kafka.core.MicrometerConsumerListener")
@ConditionalOnBean(MeterRegistry.class)
protected class KafkaMicrometer {
@Bean
@ConditionalOnMissingBean(name = "binderStreamsListener")
public StreamsListener binderStreamsListener(ConfigurableApplicationContext context) {
MeterRegistry meterRegistry = context.getBean("outerContext", ApplicationContext.class)
.getBean(MeterRegistry.class);
return new StreamsListener() {
private final Map<String, KafkaStreamsMetrics> metrics = new HashMap<>();
@Override
public synchronized void streamsAdded(String id, KafkaStreams kafkaStreams) {
if (!this.metrics.containsKey(id)) {
List<Tag> streamsTags = new ArrayList<>();
streamsTags.add(new ImmutableTag("spring.id", id));
this.metrics.put(id, new KafkaStreamsMetrics(kafkaStreams, streamsTags));
this.metrics.get(id).bindTo(meterRegistry);
}
}
@Override
public synchronized void streamsRemoved(String id, KafkaStreams streams) {
KafkaStreamsMetrics removed = this.metrics.remove(id);
if (removed != null) {
removed.close();
}
}
};
}
}
}
}

View File

@@ -40,15 +40,21 @@ class StreamsBuilderFactoryManager implements SmartLifecycle {
private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
private final KafkaStreamsRegistry kafkaStreamsRegistry;
private final KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics;
private final StreamsListener listener;
private volatile boolean running;
StreamsBuilderFactoryManager(KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
KafkaStreamsRegistry kafkaStreamsRegistry, KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics) {
KafkaStreamsRegistry kafkaStreamsRegistry,
KafkaStreamsBinderMetrics kafkaStreamsBinderMetrics,
StreamsListener listener) {
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
this.kafkaStreamsBinderMetrics = kafkaStreamsBinderMetrics;
this.listener = listener;
}
@Override
@@ -70,9 +76,13 @@ class StreamsBuilderFactoryManager implements SmartLifecycle {
try {
Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeans();
int n = 0;
for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) {
streamsBuilderFactoryBean.start();
this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
if (this.listener != null) {
this.listener.streamsAdded("streams." + n++, streamsBuilderFactoryBean.getKafkaStreams());
}
}
if (this.kafkaStreamsBinderMetrics != null) {
this.kafkaStreamsBinderMetrics.addMetrics(streamsBuilderFactoryBeans);
@@ -91,8 +101,12 @@ class StreamsBuilderFactoryManager implements SmartLifecycle {
try {
Set<StreamsBuilderFactoryBean> streamsBuilderFactoryBeans = this.kafkaStreamsBindingInformationCatalogue
.getStreamsBuilderFactoryBeans();
int n = 0;
for (StreamsBuilderFactoryBean streamsBuilderFactoryBean : streamsBuilderFactoryBeans) {
streamsBuilderFactoryBean.stop();
if (this.listener != null) {
this.listener.streamsRemoved("streams." + n++, streamsBuilderFactoryBean.getKafkaStreams());
}
}
}
catch (Exception ex) {

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.KafkaStreams;
/**
* Temporary workaround until SK 2.5.3 is available.
*
* @author Gary Russell
* @since 3.0.6
*
*/
interface StreamsListener {
/**
* A new {@link KafkaStreams} was created.
* @param id the streams id (factory bean name).
* @param streams the streams;
*/
default void streamsAdded(String id, KafkaStreams streams) {
}
/**
* An existing {@link KafkaStreams} was removed.
* @param id the streams id (factory bean name).
* @param streams the streams;
*/
default void streamsRemoved(String id, KafkaStreams streams) {
}
}

View File

@@ -63,6 +63,7 @@ import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties.StandardHeaders;
@@ -212,11 +213,13 @@ public class KafkaMessageChannelBinder extends
private final DlqPartitionFunction dlqPartitionFunction;
private final Map<ConsumerDestination, ContainerProperties.AckMode> ackModeInfo = new ConcurrentHashMap<>();
private ProducerListener<byte[], byte[]> producerListener;
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
private final Map<ConsumerDestination, ContainerProperties.AckMode> ackModeInfo = new ConcurrentHashMap<>();
private ClientFactoryCustomizer clientFactoryCustomizer;
public KafkaMessageChannelBinder(
KafkaBinderConfigurationProperties configurationProperties,
@@ -245,12 +248,11 @@ public class KafkaMessageChannelBinder extends
super(headersToMap(configurationProperties), provisioningProvider,
containerCustomizer, sourceCustomizer);
this.configurationProperties = configurationProperties;
if (StringUtils.hasText(
configurationProperties.getTransaction().getTransactionIdPrefix())) {
String txId = configurationProperties.getTransaction().getTransactionIdPrefix();
if (StringUtils.hasText(txId)) {
this.transactionManager = new KafkaTransactionManager<>(getProducerFactory(
configurationProperties.getTransaction().getTransactionIdPrefix(),
new ExtendedProducerProperties<>(configurationProperties
.getTransaction().getProducer().getExtension())));
txId, new ExtendedProducerProperties<>(configurationProperties
.getTransaction().getProducer().getExtension()), txId + ".producer"));
this.transactionTemplate = new TransactionTemplate(this.transactionManager);
}
else {
@@ -291,6 +293,10 @@ public class KafkaMessageChannelBinder extends
this.producerListener = producerListener;
}
public void setClientFactoryCustomizer(ClientFactoryCustomizer customizer) {
this.clientFactoryCustomizer = customizer;
}
Map<String, TopicInformation> getTopicsInUse() {
return this.topicsInUse;
}
@@ -353,7 +359,7 @@ public class KafkaMessageChannelBinder extends
producerProperties.getExtension().getTransactionManager());
final ProducerFactory<byte[], byte[]> producerFB = transMan != null
? transMan.getProducerFactory()
: getProducerFactory(null, producerProperties);
: getProducerFactory(null, producerProperties, destination.getName() + ".producer");
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
producerProperties.getPartitionCount(), false, () -> {
Producer<byte[], byte[]> producer = producerFB.createProducer();
@@ -484,7 +490,7 @@ public class KafkaMessageChannelBinder extends
protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
String transactionIdPrefix,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, String beanName) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
@@ -533,6 +539,10 @@ public class KafkaMessageChannelBinder extends
if (kafkaProducerProperties.getCloseTimeout() > 0) {
producerFactory.setPhysicalCloseTimeout(kafkaProducerProperties.getCloseTimeout());
}
producerFactory.setBeanName(beanName);
if (this.clientFactoryCustomizer != null) {
this.clientFactoryCustomizer.configure(producerFactory);
}
return producerFactory;
}
@@ -559,7 +569,7 @@ public class KafkaMessageChannelBinder extends
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString()
: group;
final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(
anonymous, consumerGroup, extendedConsumerProperties);
anonymous, consumerGroup, extendedConsumerProperties, destination.getName() + ".consumer");
int partitionCount = extendedConsumerProperties.getInstanceCount()
* extendedConsumerProperties.getConcurrency();
@@ -907,7 +917,7 @@ public class KafkaMessageChannelBinder extends
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString()
: group;
final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(
anonymous, consumerGroup, extendedConsumerProperties);
anonymous, consumerGroup, extendedConsumerProperties, destination.getName() + ".polled.consumer");
String[] topics = extendedConsumerProperties.isMultiplex()
? StringUtils.commaDelimitedListToStringArray(destination.getName())
: new String[] { destination.getName() };
@@ -1077,7 +1087,8 @@ public class KafkaMessageChannelBinder extends
ProducerFactory<?, ?> producerFactory = transMan != null
? transMan.getProducerFactory()
: getProducerFactory(null,
new ExtendedProducerProperties<>(dlqProducerProperties));
new ExtendedProducerProperties<>(dlqProducerProperties),
destination.getName() + ".dlq.producer");
final KafkaTemplate<?, ?> kafkaTemplate = new KafkaTemplate<>(
producerFactory);
@@ -1298,8 +1309,8 @@ public class KafkaMessageChannelBinder extends
}
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous,
String consumerGroup,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
String consumerGroup, ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties,
String beanName) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
@@ -1332,7 +1343,12 @@ public class KafkaMessageChannelBinder extends
consumerProperties.getExtension().getStartOffset().name());
}
return new DefaultKafkaConsumerFactory<>(props);
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(props);
factory.setBeanName(beanName);
if (this.clientFactoryCustomizer != null) {
this.clientFactoryCustomizer.configure(factory);
}
return factory;
}
private boolean isAutoCommitOnError(

View File

@@ -0,0 +1,37 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.config;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.ProducerFactory;
/**
* Called by the binder to customize the factories.
*
* @author Gary Russell
* @since 3.0.6
*
*/
public interface ClientFactoryCustomizer {
default void configure(ProducerFactory<?, ?> pf) {
}
default void configure(ConsumerFactory<?, ?> cf) {
}
}

View File

@@ -52,6 +52,12 @@ import org.springframework.context.annotation.Import;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.MicrometerConsumerListener;
import org.springframework.kafka.core.MicrometerProducerListener;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
@@ -111,7 +117,8 @@ public class KafkaBinderConfiguration {
@Nullable ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<?, ?>> messageHandlerCustomizer,
@Nullable ConsumerEndpointCustomizer<KafkaMessageDrivenChannelAdapter<?, ?>> consumerCustomizer,
ObjectProvider<KafkaBindingRebalanceListener> rebalanceListener,
ObjectProvider<DlqPartitionFunction> dlqPartitionFunction) {
ObjectProvider<DlqPartitionFunction> dlqPartitionFunction,
ObjectProvider<ClientFactoryCustomizer> clientFactoryCustomizer) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider,
@@ -122,6 +129,7 @@ public class KafkaBinderConfiguration {
.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
kafkaMessageChannelBinder.setProducerMessageHandlerCustomizer(messageHandlerCustomizer);
kafkaMessageChannelBinder.setConsumerEndpointCustomizer(consumerCustomizer);
kafkaMessageChannelBinder.setClientFactoryCustomizer(clientFactoryCustomizer.getIfUnique());
return kafkaMessageChannelBinder;
}
@@ -176,6 +184,39 @@ public class KafkaBinderConfiguration {
return new KafkaBinderMetrics(kafkaMessageChannelBinder,
configurationProperties, null, meterRegistry);
}
@ConditionalOnClass(name = "org.springframework.kafka.core.MicrometerConsumerListener")
@ConditionalOnBean(MeterRegistry.class)
protected class KafkaMicrometer {
@Bean
@ConditionalOnMissingBean(name = "binderClientFactoryCustomizer")
public ClientFactoryCustomizer binderClientFactoryCustomizer(MeterRegistry meterRegistry) {
return new ClientFactoryCustomizer() {
@Override
public void configure(ProducerFactory<?, ?> pf) {
if (pf instanceof DefaultKafkaProducerFactory) {
((DefaultKafkaProducerFactory<?, ?>) pf)
.addListener(new MicrometerProducerListener<>(meterRegistry));
}
}
@Override
public void configure(ConsumerFactory<?, ?> cf) {
if (cf instanceof DefaultKafkaConsumerFactory) {
((DefaultKafkaConsumerFactory<?, ?>) cf)
.addListener(new MicrometerConsumerListener<>(meterRegistry));
}
}
};
}
}
}
@Configuration
@@ -195,6 +236,43 @@ public class KafkaBinderConfiguration {
return new KafkaBinderMetrics(kafkaMessageChannelBinder,
configurationProperties, null, meterRegistry);
}
@ConditionalOnClass(name = "org.springframework.kafka.core.MicrometerConsumerListener")
@ConditionalOnBean(MeterRegistry.class)
protected class KafkaMicrometer {
@Bean
@ConditionalOnMissingBean(name = "binderClientFactoryCustomizer")
public ClientFactoryCustomizer binderClientFactoryCustomizer(ConfigurableApplicationContext context) {
return new ClientFactoryCustomizer() {
MeterRegistry meterRegistry = context.getBean("outerContext", ApplicationContext.class)
.getBean(MeterRegistry.class);
@Override
public void configure(ProducerFactory<?, ?> pf) {
if (pf instanceof DefaultKafkaProducerFactory) {
((DefaultKafkaProducerFactory<?, ?>) pf)
.addListener(new MicrometerProducerListener<>(this.meterRegistry));
}
}
@Override
public void configure(ConsumerFactory<?, ?> cf) {
if (cf instanceof DefaultKafkaConsumerFactory) {
((DefaultKafkaConsumerFactory<?, ?>) cf)
.addListener(new MicrometerConsumerListener<>(this.meterRegistry));
}
}
};
}
}
}
/**

View File

@@ -237,7 +237,7 @@ public class KafkaBinderUnitTests {
@Override
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous,
String consumerGroup,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties, String beanName) {
return new ConsumerFactory<byte[], byte[]>() {

View File

@@ -91,10 +91,10 @@ public class KafkaTransactionTests {
@Override
protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
String transactionIdPrefix,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, String beanName) {
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = spy(
super.getProducerFactory(transactionIdPrefix,
producerProperties));
producerProperties, beanName));
willReturn(mockProducer).given(producerFactory).createProducer("foo-");
return producerFactory;
}