From 3b17fed06c73021a586b092dfad25be23ea1bc5b Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 14 Apr 2022 08:48:48 +0300 Subject: [PATCH] feature: add kafka configuration --- monitoring/prometheus.yml | 17 ++++++ pom.xml | 8 +++ .../configuration/KafkaConfigProperties.java | 26 +++++++++ .../configuration/KafkaConsumerConfig.java | 56 +++++++++++++++++++ .../configuration/KafkaProducerConfig.java | 48 ++++++++++++++++ .../KafkaTopicConfiguration.java | 46 +++++++++++++++ src/main/resources/application.properties | 14 +++++ 7 files changed, 215 insertions(+) create mode 100644 monitoring/prometheus.yml create mode 100644 src/main/java/com/eventsourcing/configuration/KafkaConfigProperties.java create mode 100644 src/main/java/com/eventsourcing/configuration/KafkaConsumerConfig.java create mode 100644 src/main/java/com/eventsourcing/configuration/KafkaProducerConfig.java create mode 100644 src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java diff --git a/monitoring/prometheus.yml b/monitoring/prometheus.yml new file mode 100644 index 0000000..45907bc --- /dev/null +++ b/monitoring/prometheus.yml @@ -0,0 +1,17 @@ +global: + scrape_interval: 10s + evaluation_interval: 10s + +scrape_configs: + - job_name: 'prometheus' + static_configs: + - targets: [ 'localhost:9090' ] + + - job_name: 'system' + static_configs: + - targets: [ 'host.docker.internal:9101' ] + + - job_name: 'microservice' + metrics_path: '/actuator/prometheus' + static_configs: + - targets: [ 'host.docker.internal:8006' ] \ No newline at end of file diff --git a/pom.xml b/pom.xml index c988535..ed323ee 100644 --- a/pom.xml +++ b/pom.xml @@ -17,6 +17,14 @@ 17 + + org.springframework.boot + spring-boot-starter-validation + + + org.springframework.kafka + spring-kafka + org.springframework.boot spring-boot-starter-actuator diff --git a/src/main/java/com/eventsourcing/configuration/KafkaConfigProperties.java b/src/main/java/com/eventsourcing/configuration/KafkaConfigProperties.java new file mode 100644 index 0000000..142ff09 --- /dev/null +++ b/src/main/java/com/eventsourcing/configuration/KafkaConfigProperties.java @@ -0,0 +1,26 @@ +package com.eventsourcing.configuration; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties(prefix = "kafka") +@Data +@AllArgsConstructor +@RequiredArgsConstructor +public class KafkaConfigProperties { + private String acks = "0"; + private String compressionType = "snappy"; + private int retries = 3; + private int deliveryTimeoutMs = 120000; + private int maxRequestSize = 1068576; + private int requestTimeoutMs = 30000; + private String orderMongoProjectionGroupId = "projection-group"; + private String enableAutoCommit = "false"; + private int consumerParallelism = 16; + private int eventsTopicPartitions = 6; + private int eventsTopicReplicationFactor = 1; +} diff --git a/src/main/java/com/eventsourcing/configuration/KafkaConsumerConfig.java b/src/main/java/com/eventsourcing/configuration/KafkaConsumerConfig.java new file mode 100644 index 0000000..d0bb369 --- /dev/null +++ b/src/main/java/com/eventsourcing/configuration/KafkaConsumerConfig.java @@ -0,0 +1,56 @@ +package com.eventsourcing.configuration; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.FixedBackOff; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +@RequiredArgsConstructor +@EnableKafka +public class KafkaConsumerConfig { + + @Value(value = "${kafka.bootstrapServers:localhost:9093}") + private String bootstrapServers; + + private final KafkaConfigProperties kafkaConfigProperties; + + @Bean + public ConcurrentKafkaListenerContainerFactory + kafkaListenerContainerFactory(ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + factory.setConcurrency(Runtime.getRuntime().availableProcessors()); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); + factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L))); + return factory; + } + + @Bean + public ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerProps()); + } + + private Map consumerProps() { + final Map consumerProps = new HashMap<>(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfigProperties.getOrderMongoProjectionGroupId()); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfigProperties.getEnableAutoCommit()); + return consumerProps; + } +} diff --git a/src/main/java/com/eventsourcing/configuration/KafkaProducerConfig.java b/src/main/java/com/eventsourcing/configuration/KafkaProducerConfig.java new file mode 100644 index 0000000..d1e3eac --- /dev/null +++ b/src/main/java/com/eventsourcing/configuration/KafkaProducerConfig.java @@ -0,0 +1,48 @@ +package com.eventsourcing.configuration; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; + +import java.util.HashMap; +import java.util.Map; + +@Configuration +@RequiredArgsConstructor +public class KafkaProducerConfig { + @Value(value = "${kafka.bootstrapServers:localhost:9093}") + private String bootstrapServers; + + private final KafkaConfigProperties kafkaConfigProperties; + + private Map senderProps() { + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producerProps.put(ProducerConfig.ACKS_CONFIG, kafkaConfigProperties.getAcks()); +// producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, kafkaConfigProperties.getCompressionType()); + producerProps.put(ProducerConfig.RETRIES_CONFIG, kafkaConfigProperties.getRetries()); + producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaConfigProperties.getDeliveryTimeoutMs()); + producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaConfigProperties.getMaxRequestSize()); + producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaConfigProperties.getRequestTimeoutMs()); + return producerProps; + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(senderProps()); + } + + @Bean + public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { + return new KafkaTemplate<>(producerFactory); + } +} \ No newline at end of file diff --git a/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java b/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java new file mode 100644 index 0000000..84d98ab --- /dev/null +++ b/src/main/java/com/eventsourcing/configuration/KafkaTopicConfiguration.java @@ -0,0 +1,46 @@ +package com.eventsourcing.configuration; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaAdmin; + +import java.util.HashMap; +import java.util.Map; + + +@Configuration +@Slf4j +@RequiredArgsConstructor +public class KafkaTopicConfiguration { + + @Value(value = "${kafka.bootstrapServers:localhost:9093}") + private String bootstrapServers; + + @Value(value = "${order.kafka.topics.bank-account-event-store:bank-account-event-store}") + private String bankAccountTopicName; + + @Bean + public KafkaAdmin kafkaAdmin() { + final Map configs = new HashMap<>(); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + return new KafkaAdmin(configs); + } + + @Bean + public NewTopic bankAccountEventStoreTopicInitializer(KafkaAdmin kafkaAdmin) { + try { + final var topic = new NewTopic(bankAccountTopicName, 3, (short) 1); + kafkaAdmin.createOrModifyTopics(topic); + log.info("(bankAccountEventStoreTopicInitializer) topic: {}", topic); + return topic; + } catch (Exception e) { + log.error(e.getMessage()); + return null; + } + } +} \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 23a8695..60a3792 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -2,6 +2,8 @@ spring.application.name=microservice server.port=8006 server.tomcat.accesslog.enabled=true +logging.pattern.console=%clr(%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(line:%L) %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx} + spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.PostgreSQLDialect spring.jpa.hibernate.ddl-auto=none @@ -14,3 +16,15 @@ spring.jpa.generate-ddl=false spring.datasource.hikari.minimum-idle=10 spring.datasource.hikari.maximum-pool-size=30 + + +spring.kafka.bootstrap-servers=localhost:9093 +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.StringOrBytesSerializer +spring.kafka.consumer.group-id=microservice + +logging.level.org.apache.kafka=warn + +order.kafka.topics.bank-account-event-store=bank-account-event-store +order.kafka.groupId=es_microservice +order.kafka.default-concurrency=10 \ No newline at end of file