diff --git a/spring-kafka/pom.xml b/spring-kafka/pom.xml
index 7b0bb0a8b7..c013be32e3 100644
--- a/spring-kafka/pom.xml
+++ b/spring-kafka/pom.xml
@@ -23,6 +23,16 @@
org.springframework.boot
spring-boot-starter-web
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+ 3.0.5
+
+
+ io.micrometer
+ micrometer-registry-prometheus
+ 1.10.5
+
org.springframework.kafka
spring-kafka
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/LagAnalyzerApplication.java b/spring-kafka/src/main/java/com/baeldung/monitoring/LagAnalyzerApplication.java
index 9275151d50..d305af8c87 100644
--- a/spring-kafka/src/main/java/com/baeldung/monitoring/LagAnalyzerApplication.java
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/LagAnalyzerApplication.java
@@ -10,6 +10,7 @@ public class LagAnalyzerApplication {
public static void main(String[] args) {
SpringApplication.run(LagAnalyzerApplication.class, args);
- while (true) ;
+ while (true)
+ ;
}
}
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/service/LagAnalyzerService.java b/spring-kafka/src/main/java/com/baeldung/monitoring/service/LagAnalyzerService.java
index b046f0b2c4..f8898b60d8 100644
--- a/spring-kafka/src/main/java/com/baeldung/monitoring/service/LagAnalyzerService.java
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/service/LagAnalyzerService.java
@@ -1,6 +1,7 @@
package com.baeldung.monitoring.service;
import com.baeldung.monitoring.util.MonitoringUtil;
+
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
@@ -27,36 +28,38 @@ public class LagAnalyzerService {
private final KafkaConsumer consumer;
@Autowired
- public LagAnalyzerService(
- @Value("${monitor.kafka.bootstrap.config}") String bootstrapServerConfig) {
+ public LagAnalyzerService(@Value("${monitor.kafka.bootstrap.config}") String bootstrapServerConfig) {
adminClient = getAdminClient(bootstrapServerConfig);
consumer = getKafkaConsumer(bootstrapServerConfig);
}
- public Map analyzeLag(
- String groupId)
- throws ExecutionException, InterruptedException {
+ public Map analyzeLag(String groupId)
+ throws ExecutionException, InterruptedException {
Map consumerGrpOffsets = getConsumerGrpOffsets(groupId);
Map producerOffsets = getProducerOffsets(consumerGrpOffsets);
Map lags = computeLags(consumerGrpOffsets, producerOffsets);
for (Map.Entry lagEntry : lags.entrySet()) {
- String topic = lagEntry.getKey().topic();
- int partition = lagEntry.getKey().partition();
+ String topic = lagEntry.getKey()
+ .topic();
+ int partition = lagEntry.getKey()
+ .partition();
Long lag = lagEntry.getValue();
- LOGGER.info("Time={} | Lag for topic = {}, partition = {} is {}",
- MonitoringUtil.time(),
- topic,
- partition,
- lag);
+ LOGGER.info("Time={} | Lag for topic = {}, partition = {}, groupId = {} is {}",
+ MonitoringUtil.time(),
+ topic,
+ partition,
+ groupId,
+ lag);
}
return lags;
}
public Map getConsumerGrpOffsets(String groupId)
- throws ExecutionException, InterruptedException {
+ throws ExecutionException, InterruptedException {
ListConsumerGroupOffsetsResult info = adminClient.listConsumerGroupOffsets(groupId);
- Map metadataMap
- = info.partitionsToOffsetAndMetadata().get();
+ Map metadataMap = info
+ .partitionsToOffsetAndMetadata()
+ .get();
Map groupOffset = new HashMap<>();
for (Map.Entry entry : metadataMap.entrySet()) {
TopicPartition key = entry.getKey();
@@ -66,8 +69,7 @@ public class LagAnalyzerService {
return groupOffset;
}
- private Map getProducerOffsets(
- Map consumerGrpOffset) {
+ private Map getProducerOffsets(Map consumerGrpOffset) {
List topicPartitions = new LinkedList<>();
for (Map.Entry entry : consumerGrpOffset.entrySet()) {
TopicPartition key = entry.getKey();
@@ -77,9 +79,9 @@ public class LagAnalyzerService {
}
public Map computeLags(
- Map consumerGrpOffsets,
- Map producerOffsets) {
- Map lags = new HashMap<>();
+ Map consumerGrpOffsets,
+ Map producerOffsets) {
+ Map lags = new HashMap<>();
for (Map.Entry entry : consumerGrpOffsets.entrySet()) {
Long producerOffset = producerOffsets.get(entry.getKey());
Long consumerOffset = consumerGrpOffsets.get(entry.getKey());
@@ -91,15 +93,24 @@ public class LagAnalyzerService {
private AdminClient getAdminClient(String bootstrapServerConfig) {
Properties config = new Properties();
- config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
+ config.put(
+ AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
+ bootstrapServerConfig);
return AdminClient.create(config);
}
- private KafkaConsumer getKafkaConsumer(String bootstrapServerConfig) {
+ private KafkaConsumer getKafkaConsumer(
+ String bootstrapServerConfig) {
Properties properties = new Properties();
- properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerConfig);
- properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ bootstrapServerConfig);
+ properties.setProperty(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getName());
+ properties.setProperty(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getName());
return new KafkaConsumer<>(properties);
}
}
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/service/LiveLagAnalyzerService.java b/spring-kafka/src/main/java/com/baeldung/monitoring/service/LiveLagAnalyzerService.java
index a20b9e9a0c..7035bf73bd 100644
--- a/spring-kafka/src/main/java/com/baeldung/monitoring/service/LiveLagAnalyzerService.java
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/service/LiveLagAnalyzerService.java
@@ -15,8 +15,8 @@ public class LiveLagAnalyzerService {
@Autowired
public LiveLagAnalyzerService(
- LagAnalyzerService lagAnalyzerService,
- @Value(value = "${monitor.kafka.consumer.groupid}") String groupId) {
+ LagAnalyzerService lagAnalyzerService,
+ @Value(value = "${monitor.kafka.consumer.groupid}") String groupId) {
this.lagAnalyzerService = lagAnalyzerService;
this.groupId = groupId;
}
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ConsumerSimulator.java b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ConsumerSimulator.java
index 2d376432e5..171c17a282 100644
--- a/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ConsumerSimulator.java
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ConsumerSimulator.java
@@ -7,10 +7,9 @@ import org.springframework.stereotype.Service;
@Service
public class ConsumerSimulator {
- @KafkaListener(
- topics = "${monitor.topic.name}",
- containerFactory = "kafkaListenerContainerFactory",
- autoStartup = "${monitor.consumer.simulate}")
+ @KafkaListener(topics = "${monitor.topic.name}",
+ containerFactory = "kafkaListenerContainerFactory",
+ autoStartup = "${monitor.consumer.simulate}")
public void listenGroup(String message) throws InterruptedException {
Thread.sleep(10L);
}
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/KafkaConsumerConfig.java b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/KafkaConsumerConfig.java
index a4a8847bcf..9d5160e713 100644
--- a/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/KafkaConsumerConfig.java
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/KafkaConsumerConfig.java
@@ -2,53 +2,54 @@ package com.baeldung.monitoring.simulation;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.MicrometerConsumerListener;
+import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
+import io.micrometer.core.instrument.MeterRegistry;
+
@EnableKafka
-@Configuration
+@Component
public class KafkaConsumerConfig {
@Value(value = "${monitor.kafka.bootstrap.config}")
private String bootstrapAddress;
@Value(value = "${monitor.kafka.consumer.groupid}")
private String groupId;
- @Value(value = "${monitor.kafka.consumer.groupid.simulate}")
- private String simulateGroupId;
- @Value(value = "${monitor.producer.simulate}")
- private boolean enabled;
- public ConsumerFactory consumerFactory(String groupId) {
+ @Autowired
+ private MeterRegistry meterRegistry;
+
+ @Bean
+ public ConsumerFactory consumerFactory() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
- if (enabled) {
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- } else {
- props.put(ConsumerConfig.GROUP_ID_CONFIG, simulateGroupId);
- }
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 0);
- return new DefaultKafkaConsumerFactory<>(props);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(props);
+ consumerFactory.addListener(new MicrometerConsumerListener<>(this.meterRegistry));
+ return consumerFactory;
}
@Bean
- public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
- if (enabled) {
- factory.setConsumerFactory(consumerFactory(groupId));
- } else {
- factory.setConsumerFactory(consumerFactory(simulateGroupId));
- }
- return factory;
+ public ConcurrentKafkaListenerContainerFactory
+ kafkaListenerContainerFactory(@Qualifier("consumerFactory") ConsumerFactory consumerFactory) {
+ ConcurrentKafkaListenerContainerFactory listenerContainerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ listenerContainerFactory.setConsumerFactory(consumerFactory);
+ return listenerContainerFactory;
}
}
diff --git a/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ProducerSimulator.java b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ProducerSimulator.java
index 30476ff7ec..ad4a006809 100644
--- a/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ProducerSimulator.java
+++ b/spring-kafka/src/main/java/com/baeldung/monitoring/simulation/ProducerSimulator.java
@@ -23,10 +23,9 @@ public class ProducerSimulator {
private final boolean enabled;
@Autowired
- public ProducerSimulator(
- KafkaTemplate kafkaTemplate,
- @Value(value = "${monitor.topic.name}") String topicName,
- @Value(value = "${monitor.producer.simulate}") String enabled) {
+ public ProducerSimulator(KafkaTemplate kafkaTemplate,
+ @Value(value = "${monitor.topic.name}") String topicName,
+ @Value(value = "${monitor.producer.simulate}") String enabled) {
this.kafkaTemplate = kafkaTemplate;
this.topicName = topicName;
this.enabled = BooleanUtils.toBoolean(enabled);
@@ -37,7 +36,9 @@ public class ProducerSimulator {
if (enabled) {
if (endTime.after(new Date())) {
String message = "msg-" + time();
- SendResult result = kafkaTemplate.send(topicName, message).get();
+ SendResult result = kafkaTemplate
+ .send(topicName, message)
+ .get();
}
}
}
diff --git a/spring-kafka/src/main/resources/application.properties b/spring-kafka/src/main/resources/application.properties
index c57537e2af..1a639a43fc 100644
--- a/spring-kafka/src/main/resources/application.properties
+++ b/spring-kafka/src/main/resources/application.properties
@@ -1,3 +1,4 @@
+server.port=8081
spring.kafka.bootstrap-servers=localhost:9092
message.topic.name=baeldung
long.message.topic.name=longMessage
@@ -15,3 +16,12 @@ monitor.consumer.simulate=true
monitor.kafka.consumer.groupid.simulate=baeldungGrpSimulate
test.topic=testtopic1
+
+
+management.endpoints.web.base-path=/actuator
+management.endpoints.web.exposure.include=*
+management.endpoint.health.show-details=always
+management.endpoint.metrics.enabled=true
+management.endpoint.prometheus.enabled=true
+
+spring.jmx.enabled=false