Add Payment-Service
This commit is contained in:
@@ -11,5 +11,54 @@
|
||||
|
||||
<artifactId>payment-container</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.food.order</groupId>
|
||||
<artifactId>payment-domain-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.food.order</groupId>
|
||||
<artifactId>payment-application-service</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.food.order</groupId>
|
||||
<artifactId>payment-dataaccess</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.food.order</groupId>
|
||||
<artifactId>payment-messaging</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-maven-plugin</artifactId>
|
||||
<configuration>
|
||||
<image>
|
||||
<name>${project.groupId}/payment.service:${project.version}</name>
|
||||
</image>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>install</phase>
|
||||
<goals>
|
||||
<goal>build-image</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,14 @@
|
||||
package com.food.order.system.payment.service.domain;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
public class BeanConfig {
|
||||
|
||||
@Bean
|
||||
public PaymentDomainService paymentDomainService() {
|
||||
return new PaymentDomainServiceImpl();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
package com.food.order.system.payment.service.domain;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.domain.EntityScan;
|
||||
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
|
||||
|
||||
@EnableJpaRepositories(basePackages = "com.food.order.system.payment.data.access")
|
||||
@EntityScan(basePackages = "com.food.order.system.payment.data.access")
|
||||
@SpringBootApplication(scanBasePackages = "com.food.order")
|
||||
public class PaymentServiceApplication {
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(PaymentServiceApplication.class, args);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
server:
|
||||
port: 8182
|
||||
|
||||
logging:
|
||||
level:
|
||||
com.food.order.system: DEBUG
|
||||
|
||||
payment-service:
|
||||
payment-request-topic-name: payment-request-value
|
||||
payment-response-topic-name: payment-response-value
|
||||
|
||||
spring:
|
||||
jpa:
|
||||
open-in-view: false
|
||||
show-sql: true
|
||||
database-platform: org.hibernate.dialect.PostgreSQL9Dialect
|
||||
properties:
|
||||
hibernate:
|
||||
dialect: org.hibernate.dialect.PostgreSQL9Dialect
|
||||
datasource:
|
||||
url: jdbc:postgresql://localhost:5432/postgres?currentSchema=payment&binaryTransfer=true&reWriteBatchedInserts=true&stringtype=unspecified
|
||||
username: postgres
|
||||
password: postgres
|
||||
driver-class-name: org.postgresql.Driver
|
||||
platform: postgres
|
||||
schema: classpath:init-schema.sql
|
||||
data: classpath:init-data.sql
|
||||
initialization-mode: always
|
||||
|
||||
kafka-config:
|
||||
bootstrap-servers: localhost:19092, localhost:29092, localhost:39092
|
||||
schema-registry-url-key: schema.registry.url
|
||||
schema-registry-url: http://localhost:8081
|
||||
num-of-partitions: 3
|
||||
replication-factor: 3
|
||||
|
||||
kafka-producer-config:
|
||||
key-serializer-class: org.apache.kafka.common.serialization.StringSerializer
|
||||
value-serializer-class: io.confluent.kafka.serializers.KafkaAvroSerializer
|
||||
compression-type: snappy
|
||||
acks: all
|
||||
batch-size: 16384
|
||||
batch-size-boost-factor: 100
|
||||
linger-ms: 5
|
||||
request-timeout-ms: 60000
|
||||
retry-count: 5
|
||||
|
||||
kafka-consumer-config:
|
||||
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
||||
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
|
||||
payment-consumer-group-id: payment-topic-consumer
|
||||
auto-offset-reset: earliest
|
||||
specific-avro-reader-key: specific.avro.reader
|
||||
specific-avro-reader: true
|
||||
batch-listener: true
|
||||
auto-startup: true
|
||||
concurrency-level: 3
|
||||
session-timeout-ms: 10000
|
||||
heartbeat-interval-ms: 3000
|
||||
max-poll-interval-ms: 300000
|
||||
max-poll-records: 500
|
||||
max-partition-fetch-bytes-default: 1048576
|
||||
max-partition-fetch-bytes-boost-factor: 1
|
||||
poll-timeout-ms: 150
|
||||
@@ -0,0 +1,17 @@
|
||||
INSERT INTO payment.credit_entry(id, customer_id, total_credit_amount)
|
||||
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb21', 'd215b5f8-0249-4dc5-89a3-51fd148cfb41', 500.00);
|
||||
INSERT INTO payment.credit_history(id, customer_id, amount, type)
|
||||
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb23', 'd215b5f8-0249-4dc5-89a3-51fd148cfb41', 100.00, 'CREDIT');
|
||||
INSERT INTO payment.credit_history(id, customer_id, amount, type)
|
||||
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb24', 'd215b5f8-0249-4dc5-89a3-51fd148cfb41', 600.00, 'CREDIT');
|
||||
INSERT INTO payment.credit_history(id, customer_id, amount, type)
|
||||
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb25', 'd215b5f8-0249-4dc5-89a3-51fd148cfb41', 200.00, 'DEBIT');
|
||||
|
||||
|
||||
INSERT INTO payment.credit_entry(id, customer_id, total_credit_amount)
|
||||
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb22', 'd215b5f8-0249-4dc5-89a3-51fd148cfb43', 100.00);
|
||||
INSERT INTO payment.credit_history(id, customer_id, amount, type)
|
||||
VALUES ('d215b5f8-0249-4dc5-89a3-51fd148cfb26', 'd215b5f8-0249-4dc5-89a3-51fd148cfb43', 100.00, 'CREDIT');
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
DROP SCHEMA IF EXISTS payment CASCADE;
|
||||
|
||||
CREATE SCHEMA payment;
|
||||
|
||||
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
|
||||
|
||||
DROP TYPE IF EXISTS payment_status;
|
||||
|
||||
CREATE TYPE payment_status AS ENUM ('COMPLETED', 'CANCELLED', 'FAILED');
|
||||
|
||||
DROP TABLE IF EXISTS "payment".payments CASCADE;
|
||||
|
||||
CREATE TABLE "payment".payments
|
||||
(
|
||||
id uuid NOT NULL,
|
||||
customer_id uuid NOT NULL,
|
||||
order_id uuid NOT NULL,
|
||||
price numeric(10,2) NOT NULL,
|
||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
|
||||
status payment_status NOT NULL,
|
||||
CONSTRAINT payments_pkey PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
DROP TABLE IF EXISTS "payment".credit_entry CASCADE;
|
||||
|
||||
CREATE TABLE "payment".credit_entry
|
||||
(
|
||||
id uuid NOT NULL,
|
||||
customer_id uuid NOT NULL,
|
||||
total_credit_amount numeric(10,2) NOT NULL,
|
||||
CONSTRAINT credit_entry_pkey PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
DROP TYPE IF EXISTS transaction_type;
|
||||
|
||||
CREATE TYPE transaction_type AS ENUM ('DEBIT', 'CREDIT');
|
||||
|
||||
DROP TABLE IF EXISTS "payment".credit_history CASCADE;
|
||||
|
||||
CREATE TABLE "payment".credit_history
|
||||
(
|
||||
id uuid NOT NULL,
|
||||
customer_id uuid NOT NULL,
|
||||
amount numeric(10,2) NOT NULL,
|
||||
type transaction_type NOT NULL,
|
||||
CONSTRAINT credit_history_pkey PRIMARY KEY (id)
|
||||
);
|
||||
@@ -2,7 +2,6 @@ package com.food.order.system.payment.data.access.credithistory.adapter;
|
||||
|
||||
|
||||
import com.food.order.system.payment.application.service.ports.output.repository.CreditHistoryRepository;
|
||||
import com.food.order.system.payment.data.access.credithistory.entity.CreditHistoryEntity;
|
||||
import com.food.order.system.payment.data.access.credithistory.mapper.CreditHistoryDataAccessMapper;
|
||||
import com.food.order.system.payment.data.access.credithistory.repository.CreditHistoryJpaRepository;
|
||||
import com.food.order.system.payment.service.domain.entity.CreditHistory;
|
||||
@@ -11,7 +10,6 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Component
|
||||
public class CreditHistoryRepositoryImpl implements CreditHistoryRepository {
|
||||
@@ -33,12 +31,11 @@ public class CreditHistoryRepositoryImpl implements CreditHistoryRepository {
|
||||
|
||||
@Override
|
||||
public Optional<List<CreditHistory>> findByCustomerId(CustomerId customerId) {
|
||||
Optional<List<CreditHistoryEntity>> creditHistory =
|
||||
creditHistoryJpaRepository.findByCustomerId(customerId.getValue());
|
||||
return creditHistory
|
||||
return creditHistoryJpaRepository.findByCustomerId(customerId.getValue())
|
||||
.map(creditHistoryList ->
|
||||
creditHistoryList.stream()
|
||||
.map(creditHistoryDataAccessMapper::creditHistoryEntityToCreditHistory)
|
||||
.collect(Collectors.toList()));
|
||||
.toList());
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,5 +11,25 @@
|
||||
|
||||
<artifactId>payment-messaging</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.food.order</groupId>
|
||||
<artifactId>payment-application-service</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.food.order</groupId>
|
||||
<artifactId>kafka-producer</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.food.order</groupId>
|
||||
<artifactId>kafka-consumer</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.food.order</groupId>
|
||||
<artifactId>kafka-model</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.food.order.system.payment.messaging.listener.kafka;
|
||||
|
||||
import com.food.order.system.kafka.consumer.KafkaConsumer;
|
||||
import com.food.order.system.kafka.order.avro.model.PaymentRequestAvroModel;
|
||||
import com.food.order.system.payment.application.service.ports.input.message.listener.PaymentRequestMessageListener;
|
||||
import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper;
|
||||
import com.food.order.sysyem.valueobject.PaymentOrderStatus;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.messaging.handler.annotation.Header;
|
||||
import org.springframework.messaging.handler.annotation.Payload;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class PaymentRequestKafkaListener implements KafkaConsumer<PaymentRequestAvroModel> {
|
||||
|
||||
private final PaymentMessagingDataMapper paymentMessagingDataMapper;
|
||||
private final PaymentRequestMessageListener paymentRequestMessageListener;
|
||||
|
||||
|
||||
@Override
|
||||
@KafkaListener(id = "${kafka-consumer-config.payment-consumer-group-id}",
|
||||
topics = "${payment-service.payment-request-topic-name}")
|
||||
public void receive(@Payload List<PaymentRequestAvroModel> messages,
|
||||
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys,
|
||||
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
|
||||
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
|
||||
log.info("{} number of payment requests received with keys:{}, partitions:{} and offsets: {}",
|
||||
messages.size(),
|
||||
keys.toString(),
|
||||
partitions.toString(),
|
||||
offsets.toString());
|
||||
|
||||
messages.forEach(paymentRequestAvroModel -> {
|
||||
if (PaymentOrderStatus.PENDING.name().equalsIgnoreCase(paymentRequestAvroModel.getPaymentOrderStatus().name())) {
|
||||
log.info("Processing payment for order id: {}", paymentRequestAvroModel.getOrderId());
|
||||
paymentRequestMessageListener.completePayment(paymentMessagingDataMapper
|
||||
.paymentRequestAvroModelToPaymentRequest(paymentRequestAvroModel));
|
||||
} else if(PaymentOrderStatus.CANCELLED.name().equalsIgnoreCase(paymentRequestAvroModel.getPaymentOrderStatus().name())) {
|
||||
log.info("Cancelling payment for order id: {}", paymentRequestAvroModel.getOrderId());
|
||||
paymentRequestMessageListener.cancelPayment(paymentMessagingDataMapper
|
||||
.paymentRequestAvroModelToPaymentRequest(paymentRequestAvroModel));
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
package com.food.order.system.payment.messaging.mapper;
|
||||
|
||||
import com.food.order.system.kafka.order.avro.model.PaymentRequestAvroModel;
|
||||
import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel;
|
||||
import com.food.order.system.kafka.order.avro.model.PaymentStatus;
|
||||
import com.food.order.system.payment.application.service.dto.PaymentRequest;
|
||||
import com.food.order.system.payment.service.domain.event.PaymentCancelledEvent;
|
||||
import com.food.order.system.payment.service.domain.event.PaymentCompletedEvent;
|
||||
import com.food.order.system.payment.service.domain.event.PaymentFailedEvent;
|
||||
import com.food.order.sysyem.valueobject.PaymentOrderStatus;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
public class PaymentMessagingDataMapper {
|
||||
|
||||
public PaymentResponseAvroModel paymentCompletedEventToPaymentResponseAvroModel(PaymentCompletedEvent paymentCompletedEvent) {
|
||||
return PaymentResponseAvroModel.newBuilder()
|
||||
.setId(UUID.randomUUID().toString())
|
||||
.setSagaId("")
|
||||
.setPaymentId(paymentCompletedEvent.getPayment().getId().getValue().toString())
|
||||
.setCustomerId(paymentCompletedEvent.getPayment().getCustomerId().getValue().toString())
|
||||
.setOrderId(paymentCompletedEvent.getPayment().getOrderId().getValue().toString())
|
||||
.setPrice(paymentCompletedEvent.getPayment().getPrice().getAmount())
|
||||
.setCreatedAt(paymentCompletedEvent.getPayment().getCreatedAt().toInstant())
|
||||
.setPaymentStatus(PaymentStatus.valueOf(paymentCompletedEvent.getPayment().getStatus().name()))
|
||||
.setFailureMessages(paymentCompletedEvent.getFailureMessages())
|
||||
.build();
|
||||
}
|
||||
|
||||
public PaymentResponseAvroModel paymentCancelEventToPaymentResponseAvroModel(PaymentCancelledEvent paymentCancelledEvent) {
|
||||
return PaymentResponseAvroModel.newBuilder()
|
||||
.setId(UUID.randomUUID().toString())
|
||||
.setSagaId("")
|
||||
.setPaymentId(paymentCancelledEvent.getPayment().getId().getValue().toString())
|
||||
.setCustomerId(paymentCancelledEvent.getPayment().getCustomerId().getValue().toString())
|
||||
.setOrderId(paymentCancelledEvent.getPayment().getOrderId().getValue().toString())
|
||||
.setPrice(paymentCancelledEvent.getPayment().getPrice().getAmount())
|
||||
.setCreatedAt(paymentCancelledEvent.getPayment().getCreatedAt().toInstant())
|
||||
.setPaymentStatus(PaymentStatus.valueOf(paymentCancelledEvent.getPayment().getStatus().name()))
|
||||
.setFailureMessages(paymentCancelledEvent.getFailureMessages())
|
||||
.build();
|
||||
}
|
||||
|
||||
public PaymentResponseAvroModel paymentFailedEventToPaymentResponseAvroModel(PaymentFailedEvent paymentFailedEvent) {
|
||||
return PaymentResponseAvroModel.newBuilder()
|
||||
.setId(UUID.randomUUID().toString())
|
||||
.setSagaId("")
|
||||
.setPaymentId(paymentFailedEvent.getPayment().getId().getValue().toString())
|
||||
.setCustomerId(paymentFailedEvent.getPayment().getCustomerId().getValue().toString())
|
||||
.setOrderId(paymentFailedEvent.getPayment().getOrderId().getValue().toString())
|
||||
.setPrice(paymentFailedEvent.getPayment().getPrice().getAmount())
|
||||
.setCreatedAt(paymentFailedEvent.getPayment().getCreatedAt().toInstant())
|
||||
.setPaymentStatus(PaymentStatus.valueOf(paymentFailedEvent.getPayment().getStatus().name()))
|
||||
.setFailureMessages(paymentFailedEvent.getFailureMessages())
|
||||
.build();
|
||||
}
|
||||
|
||||
public PaymentRequest paymentRequestAvroModelToPaymentRequest(PaymentRequestAvroModel paymentRequestAvroModel) {
|
||||
return PaymentRequest.builder()
|
||||
.id(paymentRequestAvroModel.getId())
|
||||
.sagaId(paymentRequestAvroModel.getSagaId())
|
||||
.customerId(paymentRequestAvroModel.getCustomerId())
|
||||
.orderId(paymentRequestAvroModel.getOrderId())
|
||||
.price(paymentRequestAvroModel.getPrice())
|
||||
.createdAt(paymentRequestAvroModel.getCreatedAt())
|
||||
.status(PaymentOrderStatus.valueOf(paymentRequestAvroModel.getPaymentOrderStatus().name()))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package com.food.order.system.payment.messaging.publisher.kafka;
|
||||
|
||||
import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel;
|
||||
import com.food.order.system.kafka.producer.KafkaMessageHelper;
|
||||
import com.food.order.system.kafka.producer.service.KafkaProducer;
|
||||
import com.food.order.system.payment.application.service.config.PaymentServiceConfigData;
|
||||
import com.food.order.system.payment.application.service.ports.output.message.publisher.PaymentCancelledMessagePublisher;
|
||||
import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper;
|
||||
import com.food.order.system.payment.service.domain.event.PaymentCancelledEvent;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class PaymentCancelledKafkaMessagePublisher implements PaymentCancelledMessagePublisher {
|
||||
|
||||
private final PaymentMessagingDataMapper paymentDataMapper;
|
||||
private final KafkaProducer<String , PaymentResponseAvroModel> kafkaProducer;
|
||||
private final PaymentServiceConfigData paymentServiceConfigData;
|
||||
private final KafkaMessageHelper kafkaMessageHelper;
|
||||
|
||||
|
||||
@Override
|
||||
public void publish(PaymentCancelledEvent event) {
|
||||
log.info("Publishing payment cancelled event to kafka");
|
||||
var orderId = event.getPayment().getOrderId().getValue().toString();
|
||||
try {
|
||||
var paymentResponseAvroModel =
|
||||
paymentDataMapper.paymentCancelEventToPaymentResponseAvroModel(event);
|
||||
|
||||
kafkaProducer.send(paymentServiceConfigData.getPaymentResponseTopicName(),
|
||||
orderId,
|
||||
paymentResponseAvroModel,
|
||||
kafkaMessageHelper.getKafkaCallBack(
|
||||
paymentServiceConfigData.getPaymentResponseTopicName(),
|
||||
paymentResponseAvroModel,
|
||||
orderId,
|
||||
"PaymentResponseAvroModel"));
|
||||
|
||||
log.info("Published payment cancelled event to kafka");
|
||||
} catch (Exception e) {
|
||||
log.error("Error while publishing payment cancelled event to kafka", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package com.food.order.system.payment.messaging.publisher.kafka;
|
||||
|
||||
import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel;
|
||||
import com.food.order.system.kafka.producer.KafkaMessageHelper;
|
||||
import com.food.order.system.kafka.producer.service.KafkaProducer;
|
||||
import com.food.order.system.payment.application.service.config.PaymentServiceConfigData;
|
||||
import com.food.order.system.payment.application.service.ports.output.message.publisher.PaymentCompletedMessagePublisher;
|
||||
import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper;
|
||||
import com.food.order.system.payment.service.domain.event.PaymentCompletedEvent;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class PaymentCompletedKafkaMessagePublisher implements PaymentCompletedMessagePublisher {
|
||||
|
||||
private final PaymentMessagingDataMapper paymentDataMapper;
|
||||
private final KafkaProducer<String , PaymentResponseAvroModel> kafkaProducer;
|
||||
private final PaymentServiceConfigData paymentServiceConfigData;
|
||||
|
||||
private final KafkaMessageHelper kafkaMessageHelper;
|
||||
|
||||
|
||||
@Override
|
||||
public void publish(PaymentCompletedEvent event) {
|
||||
log.info("Publishing payment completed event to kafka");
|
||||
var orderId = event.getPayment().getOrderId().getValue().toString();
|
||||
try {
|
||||
var paymentResponseAvroModel =
|
||||
paymentDataMapper.paymentCompletedEventToPaymentResponseAvroModel(event);
|
||||
|
||||
kafkaProducer.send(paymentServiceConfigData.getPaymentResponseTopicName(),
|
||||
orderId,
|
||||
paymentResponseAvroModel,
|
||||
kafkaMessageHelper.getKafkaCallBack(
|
||||
paymentServiceConfigData.getPaymentResponseTopicName(),
|
||||
paymentResponseAvroModel,
|
||||
orderId,
|
||||
"PaymentResponseAvroModel"));
|
||||
|
||||
log.info("Published payment completed event to kafka");
|
||||
} catch (Exception e) {
|
||||
log.error("Error while publishing payment completed event to kafka", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package com.food.order.system.payment.messaging.publisher.kafka;
|
||||
|
||||
import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel;
|
||||
import com.food.order.system.kafka.producer.KafkaMessageHelper;
|
||||
import com.food.order.system.kafka.producer.service.KafkaProducer;
|
||||
import com.food.order.system.payment.application.service.config.PaymentServiceConfigData;
|
||||
import com.food.order.system.payment.application.service.ports.output.message.publisher.PaymentFailedMessagePublisher;
|
||||
import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper;
|
||||
import com.food.order.system.payment.service.domain.event.PaymentFailedEvent;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class PaymentFailedKafkaMessagePublisher implements PaymentFailedMessagePublisher {
|
||||
|
||||
private final PaymentMessagingDataMapper paymentDataMapper;
|
||||
private final KafkaProducer<String , PaymentResponseAvroModel> kafkaProducer;
|
||||
private final PaymentServiceConfigData paymentServiceConfigData;
|
||||
|
||||
private final KafkaMessageHelper kafkaMessageHelper;
|
||||
|
||||
|
||||
@Override
|
||||
public void publish(PaymentFailedEvent event) {
|
||||
log.info("Publishing payment failed event to kafka");
|
||||
var orderId = event.getPayment().getOrderId().getValue().toString();
|
||||
try {
|
||||
var paymentResponseAvroModel =
|
||||
paymentDataMapper.paymentFailedEventToPaymentResponseAvroModel(event);
|
||||
|
||||
kafkaProducer.send(paymentServiceConfigData.getPaymentResponseTopicName(),
|
||||
orderId,
|
||||
paymentResponseAvroModel,
|
||||
kafkaMessageHelper.getKafkaCallBack(
|
||||
paymentServiceConfigData.getPaymentResponseTopicName(),
|
||||
paymentResponseAvroModel,
|
||||
orderId,
|
||||
"PaymentResponseAvroModel"));
|
||||
|
||||
log.info("Published payment failed event to kafka");
|
||||
} catch (Exception e) {
|
||||
log.error("Error while publishing payment failed event to kafka", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user