Outbox Message and Scheduler class implemented part - 5.

This commit is contained in:
Ali CANLI
2022-07-16 22:20:20 +03:00
parent 68e7f14dca
commit 2ff4b7b3d5
14 changed files with 409 additions and 201 deletions

View File

@@ -0,0 +1,115 @@
package com.food.order.system;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.payment.application.service.dto.PaymentRequest;
import com.food.order.system.payment.application.service.ports.input.message.listener.PaymentRequestMessageListener;
import com.food.order.system.payment.data.access.outobx.entity.OrderOutboxEntity;
import com.food.order.system.payment.data.access.outobx.repository.OrderOutboxJpaRepository;
import com.food.order.system.payment.service.domain.PaymentServiceApplication;
import com.food.order.system.valueobject.PaymentOrderStatus;
import com.food.order.system.valueobject.PaymentStatus;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.postgresql.util.PSQLException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.dao.DataAccessException;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.food.order.system.outbox.order.SagaConst.ORDER_PROCESSING_SAGA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Slf4j
@SpringBootTest(classes = {PaymentServiceApplication.class})
class PaymentRequestMessageListenerTest {
@Autowired
private PaymentRequestMessageListener paymentRequestMessageListener;
@Autowired
private OrderOutboxJpaRepository orderOutboxRepository;
private final static String CUSTOMER_ID = "d215b5f8-0249-4dc5-89a3-51fd148cfb41";
private final static BigDecimal PRICE = new BigDecimal("100");
@Test
void testDoublePayment() {
String sagaId = UUID.randomUUID().toString();
paymentRequestMessageListener.completePayment(getPaymentRequest(sagaId));
try {
paymentRequestMessageListener.completePayment(getPaymentRequest(sagaId));
} catch (DataAccessException e) {
log.error("DataAccessException occurred with sql state: {}",
((PSQLException) Objects.requireNonNull(e.getRootCause())).getSQLState());
}
assertOrderOutbox(sagaId);
}
@Test
void testDoublePaymentWithThreads() {
String sagaId = UUID.randomUUID().toString();
ExecutorService executor = null;
try {
executor = Executors.newFixedThreadPool(2);
List<Callable<Object>> tasks = new ArrayList<>();
tasks.add(Executors.callable(() -> {
try {
paymentRequestMessageListener.completePayment(getPaymentRequest(sagaId));
} catch (DataAccessException e) {
log.error("DataAccessException occurred for thread 1 with sql state: {}",
((PSQLException) Objects.requireNonNull(e.getRootCause())).getSQLState());
}
}));
tasks.add(Executors.callable(() -> {
try {
paymentRequestMessageListener.completePayment(getPaymentRequest(sagaId));
} catch (DataAccessException e) {
log.error("DataAccessException occurred for thread 2 with sql state: {}",
((PSQLException) Objects.requireNonNull(e.getRootCause())).getSQLState());
}
}));
executor.invokeAll(tasks);
assertOrderOutbox(sagaId);
} catch (InterruptedException e) {
log.error("Error calling complete payment!", e);
} finally {
if (executor != null) {
executor.shutdown();
}
}
}
private void assertOrderOutbox(String sagaId) {
Optional<OrderOutboxEntity> orderOutboxEntity = orderOutboxRepository
.findByTypeAndSagaIdAndPaymentStatusAndOutboxStatus(ORDER_PROCESSING_SAGA,
UUID.fromString(sagaId),
PaymentStatus.COMPLETED,
OutboxStatus.STARTED);
assertTrue(orderOutboxEntity.isPresent());
assertEquals(orderOutboxEntity.get().getSagaId().toString(), sagaId);
}
private PaymentRequest getPaymentRequest(String sagaId) {
return PaymentRequest.builder()
.id(UUID.randomUUID().toString())
.sagaId(sagaId)
.orderId(UUID.randomUUID().toString())
.status(PaymentOrderStatus.PENDING)
.customerId(CUSTOMER_ID)
.price(PRICE)
.createdAt(Instant.now())
.build();
}
}

View File

@@ -0,0 +1,58 @@
package com.food.order.system.payment.data.access.outobx.adapter;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.payment.application.service.outbox.model.OrderOutboxMessage;
import com.food.order.system.payment.application.service.ports.output.repository.OrderOutboxRepository;
import com.food.order.system.payment.data.access.outobx.exception.OrderOutboxNotFoundException;
import com.food.order.system.payment.data.access.outobx.mapper.OrderOutboxDataAccessMapper;
import com.food.order.system.payment.data.access.outobx.repository.OrderOutboxJpaRepository;
import com.food.order.system.valueobject.PaymentStatus;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
@Component
@RequiredArgsConstructor
public class OrderOutboxRepositoryImpl implements OrderOutboxRepository {
private final OrderOutboxJpaRepository orderOutboxJpaRepository;
private final OrderOutboxDataAccessMapper orderOutboxDataAccessMapper;
@Override
public OrderOutboxMessage save(OrderOutboxMessage orderPaymentOutboxMessage) {
return orderOutboxDataAccessMapper
.orderOutboxEntityToOrderOutboxMessage(orderOutboxJpaRepository
.save(orderOutboxDataAccessMapper
.orderOutboxMessageToOutboxEntity(orderPaymentOutboxMessage)));
}
@Override
public Optional<List<OrderOutboxMessage>> findByTypeAndOutboxStatus(String sagaType, OutboxStatus outboxStatus) {
return Optional.of(orderOutboxJpaRepository.findByTypeAndOutboxStatus(sagaType, outboxStatus)
.orElseThrow(() -> new OrderOutboxNotFoundException("Approval outbox object " +
"cannot be found for saga type " + sagaType))
.stream()
.map(orderOutboxDataAccessMapper::orderOutboxEntityToOrderOutboxMessage)
.collect(Collectors.toList()));
}
@Override
public Optional<OrderOutboxMessage> findByTypeAndSagaIdAndPaymentStatusAndOutboxStatus(String sagaType,
UUID sagaId,
PaymentStatus paymentStatus,
OutboxStatus outboxStatus) {
return orderOutboxJpaRepository.findByTypeAndSagaIdAndPaymentStatusAndOutboxStatus(sagaType, sagaId,
paymentStatus, outboxStatus)
.map(orderOutboxDataAccessMapper::orderOutboxEntityToOrderOutboxMessage);
}
@Override
public void deleteByTypeAndOutboxStatus(String sagaType, OutboxStatus outboxStatus) {
orderOutboxJpaRepository.deleteByTypeAndOutboxStatus(sagaType, outboxStatus);
}
}

View File

@@ -0,0 +1,49 @@
package com.food.order.system.payment.data.access.outobx.entity;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.valueobject.PaymentStatus;
import lombok.*;
import javax.persistence.*;
import java.time.ZonedDateTime;
import java.util.Objects;
import java.util.UUID;
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "order_outbox")
@Entity
public class OrderOutboxEntity {
@Id
private UUID id;
private UUID sagaId;
private ZonedDateTime createdAt;
private ZonedDateTime processedAt;
private String type;
private String payload;
@Enumerated(EnumType.STRING)
private OutboxStatus outboxStatus;
@Enumerated(EnumType.STRING)
private PaymentStatus paymentStatus;
@Version
private int version;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OrderOutboxEntity that = (OrderOutboxEntity) o;
return id.equals(that.id);
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}

View File

@@ -0,0 +1,8 @@
package com.food.order.system.payment.data.access.outobx.exception;
public class OrderOutboxNotFoundException extends RuntimeException {
public OrderOutboxNotFoundException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,37 @@
package com.food.order.system.payment.data.access.outobx.mapper;
import com.food.order.system.payment.application.service.outbox.model.OrderOutboxMessage;
import com.food.order.system.payment.data.access.outobx.entity.OrderOutboxEntity;
import org.springframework.stereotype.Component;
@Component
public class OrderOutboxDataAccessMapper {
public OrderOutboxEntity orderOutboxMessageToOutboxEntity(OrderOutboxMessage orderOutboxMessage) {
return OrderOutboxEntity.builder()
.id(orderOutboxMessage.getId())
.sagaId(orderOutboxMessage.getSagaId())
.createdAt(orderOutboxMessage.getCreatedAt())
.type(orderOutboxMessage.getType())
.payload(orderOutboxMessage.getPayload())
.outboxStatus(orderOutboxMessage.getOutboxStatus())
.paymentStatus(orderOutboxMessage.getPaymentStatus())
.version(orderOutboxMessage.getVersion())
.build();
}
public OrderOutboxMessage orderOutboxEntityToOrderOutboxMessage(OrderOutboxEntity paymentOutboxEntity) {
return OrderOutboxMessage.builder()
.id(paymentOutboxEntity.getId())
.sagaId(paymentOutboxEntity.getSagaId())
.createdAt(paymentOutboxEntity.getCreatedAt())
.type(paymentOutboxEntity.getType())
.payload(paymentOutboxEntity.getPayload())
.outboxStatus(paymentOutboxEntity.getOutboxStatus())
.paymentStatus(paymentOutboxEntity.getPaymentStatus())
.version(paymentOutboxEntity.getVersion())
.build();
}
}

View File

@@ -0,0 +1,26 @@
package com.food.order.system.payment.data.access.outobx.repository;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.payment.data.access.outobx.entity.OrderOutboxEntity;
import com.food.order.system.valueobject.PaymentStatus;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@Repository
public interface OrderOutboxJpaRepository extends JpaRepository<OrderOutboxEntity, UUID> {
Optional<List<OrderOutboxEntity>> findByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus);
Optional<OrderOutboxEntity> findByTypeAndSagaIdAndPaymentStatusAndOutboxStatus(String type,
UUID sagaId,
PaymentStatus paymentStatus,
OutboxStatus outboxStatus);
void deleteByTypeAndOutboxStatus(String type, OutboxStatus outboxStatus);
}

View File

@@ -13,6 +13,7 @@ import com.food.order.system.payment.service.domain.PaymentDomainService;
import com.food.order.system.payment.service.domain.entity.CreditEntry;
import com.food.order.system.payment.service.domain.entity.CreditHistory;
import com.food.order.system.payment.service.domain.entity.Payment;
import com.food.order.system.payment.service.domain.exception.PaymentNotFoundException;
import com.food.order.system.valueobject.CustomerId;
import com.food.order.system.valueobject.PaymentStatus;
import lombok.RequiredArgsConstructor;
@@ -62,10 +63,6 @@ public class PaymentRequestHelper {
paymentEvent.getPayment().getStatus(),
OutboxStatus.STARTED,
UUID.fromString(paymentRequest.getSagaId()));
}
private boolean publishIfOutboxMessageProcessedForPayment(PaymentRequest paymentRequest,
@@ -91,7 +88,7 @@ public class PaymentRequestHelper {
log.info("Received payment cancel event for id : {}", paymentRequest.getOrderId());
var payment = paymentRepository.findByOrderId
(UUID.fromString(paymentRequest.getOrderId())).orElseThrow(
() -> new PaymentApplicationServiceException("Payment not found"));
() -> new PaymentNotFoundException("Payment not found"));
var creditEntry = getCreditEntry(payment.getCustomerId());
var creditHistory = getCreditHistory(payment.getCustomerId());
List<String> failureMessage = new ArrayList<>();

View File

@@ -31,5 +31,10 @@
<groupId>com.food.order</groupId>
<artifactId>kafka-model</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -2,18 +2,24 @@ package com.food.order.system.payment.messaging.listener.kafka;
import com.food.order.system.kafka.consumer.KafkaConsumer;
import com.food.order.system.kafka.order.avro.model.PaymentRequestAvroModel;
import com.food.order.system.payment.application.service.exception.PaymentApplicationServiceException;
import com.food.order.system.payment.application.service.ports.input.message.listener.PaymentRequestMessageListener;
import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper;
import com.food.order.system.payment.service.domain.exception.PaymentNotFoundException;
import com.food.order.system.valueobject.PaymentOrderStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.postgresql.util.PSQLState;
import org.springframework.dao.DataAccessException;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
@Component
@Slf4j
@@ -38,14 +44,33 @@ public class PaymentRequestKafkaListener implements KafkaConsumer<PaymentRequest
offsets.toString());
messages.forEach(paymentRequestAvroModel -> {
if (PaymentOrderStatus.PENDING.name().equalsIgnoreCase(paymentRequestAvroModel.getPaymentOrderStatus().name())) {
log.info("Processing payment for order id: {}", paymentRequestAvroModel.getOrderId());
paymentRequestMessageListener.completePayment(paymentMessagingDataMapper
.paymentRequestAvroModelToPaymentRequest(paymentRequestAvroModel));
} else if(PaymentOrderStatus.CANCELLED.name().equalsIgnoreCase(paymentRequestAvroModel.getPaymentOrderStatus().name())) {
log.info("Cancelling payment for order id: {}", paymentRequestAvroModel.getOrderId());
paymentRequestMessageListener.cancelPayment(paymentMessagingDataMapper
.paymentRequestAvroModelToPaymentRequest(paymentRequestAvroModel));
try {
if (Objects.equals(PaymentOrderStatus.PENDING.name(),
paymentRequestAvroModel.getPaymentOrderStatus().name())) {
log.info("Processing payment for order id: {}", paymentRequestAvroModel.getOrderId());
paymentRequestMessageListener.completePayment(paymentMessagingDataMapper
.paymentRequestAvroModelToPaymentRequest(paymentRequestAvroModel));
} else if(PaymentOrderStatus.CANCELLED.name().equals
(paymentRequestAvroModel.getPaymentOrderStatus().name())) {
log.info("Cancelling payment for order id: {}", paymentRequestAvroModel.getOrderId());
paymentRequestMessageListener.cancelPayment(paymentMessagingDataMapper
.paymentRequestAvroModelToPaymentRequest(paymentRequestAvroModel));
}
} catch (DataAccessException e) {
SQLException sqlException = (SQLException) e.getRootCause();
if (sqlException != null && sqlException.getSQLState() != null &&
PSQLState.UNIQUE_VIOLATION.getState().equals(sqlException.getSQLState())) {
//NO-OP for unique constraint exception
log.error("Caught unique constraint exception with sql state: {} " +
"in PaymentRequestKafkaListener for order id: {}",
sqlException.getSQLState(), paymentRequestAvroModel.getOrderId());
} else {
throw new PaymentApplicationServiceException("Throwing DataAccessException in" +
" PaymentRequestKafkaListener: " + e.getMessage(), e);
}
} catch (PaymentNotFoundException e) {
//NO-OP for PaymentNotFoundException
log.error("No payment found for order id: {}", paymentRequestAvroModel.getOrderId());
}
});

View File

@@ -4,9 +4,7 @@ import com.food.order.system.kafka.order.avro.model.PaymentRequestAvroModel;
import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel;
import com.food.order.system.kafka.order.avro.model.PaymentStatus;
import com.food.order.system.payment.application.service.dto.PaymentRequest;
import com.food.order.system.payment.service.domain.event.PaymentCancelledEvent;
import com.food.order.system.payment.service.domain.event.PaymentCompletedEvent;
import com.food.order.system.payment.service.domain.event.PaymentFailedEvent;
import com.food.order.system.payment.application.service.outbox.model.OrderEventPayload;
import com.food.order.system.valueobject.PaymentOrderStatus;
import org.springframework.stereotype.Component;
@@ -15,47 +13,6 @@ import java.util.UUID;
@Component
public class PaymentMessagingDataMapper {
public PaymentResponseAvroModel paymentCompletedEventToPaymentResponseAvroModel(PaymentCompletedEvent paymentCompletedEvent) {
return PaymentResponseAvroModel.newBuilder()
.setId(UUID.randomUUID().toString())
.setSagaId("")
.setPaymentId(paymentCompletedEvent.getPayment().getId().getValue().toString())
.setCustomerId(paymentCompletedEvent.getPayment().getCustomerId().getValue().toString())
.setOrderId(paymentCompletedEvent.getPayment().getOrderId().getValue().toString())
.setPrice(paymentCompletedEvent.getPayment().getPrice().getAmount())
.setCreatedAt(paymentCompletedEvent.getPayment().getCreatedAt().toInstant())
.setPaymentStatus(PaymentStatus.valueOf(paymentCompletedEvent.getPayment().getStatus().name()))
.setFailureMessages(paymentCompletedEvent.getFailureMessages())
.build();
}
public PaymentResponseAvroModel paymentCancelEventToPaymentResponseAvroModel(PaymentCancelledEvent paymentCancelledEvent) {
return PaymentResponseAvroModel.newBuilder()
.setId(UUID.randomUUID().toString())
.setSagaId("")
.setPaymentId(paymentCancelledEvent.getPayment().getId().getValue().toString())
.setCustomerId(paymentCancelledEvent.getPayment().getCustomerId().getValue().toString())
.setOrderId(paymentCancelledEvent.getPayment().getOrderId().getValue().toString())
.setPrice(paymentCancelledEvent.getPayment().getPrice().getAmount())
.setCreatedAt(paymentCancelledEvent.getPayment().getCreatedAt().toInstant())
.setPaymentStatus(PaymentStatus.valueOf(paymentCancelledEvent.getPayment().getStatus().name()))
.setFailureMessages(paymentCancelledEvent.getFailureMessages())
.build();
}
public PaymentResponseAvroModel paymentFailedEventToPaymentResponseAvroModel(PaymentFailedEvent paymentFailedEvent) {
return PaymentResponseAvroModel.newBuilder()
.setId(UUID.randomUUID().toString())
.setSagaId("")
.setPaymentId(paymentFailedEvent.getPayment().getId().getValue().toString())
.setCustomerId(paymentFailedEvent.getPayment().getCustomerId().getValue().toString())
.setOrderId(paymentFailedEvent.getPayment().getOrderId().getValue().toString())
.setPrice(paymentFailedEvent.getPayment().getPrice().getAmount())
.setCreatedAt(paymentFailedEvent.getPayment().getCreatedAt().toInstant())
.setPaymentStatus(PaymentStatus.valueOf(paymentFailedEvent.getPayment().getStatus().name()))
.setFailureMessages(paymentFailedEvent.getFailureMessages())
.build();
}
public PaymentRequest paymentRequestAvroModelToPaymentRequest(PaymentRequestAvroModel paymentRequestAvroModel) {
return PaymentRequest.builder()
@@ -69,6 +26,19 @@ public class PaymentMessagingDataMapper {
.build();
}
public PaymentResponseAvroModel orderEventPayloadToPaymentResponseAvroModel(String sagaId,
OrderEventPayload orderEventPayload) {
return PaymentResponseAvroModel.newBuilder()
.setId(UUID.randomUUID().toString())
.setSagaId(sagaId)
.setPaymentId(orderEventPayload.getPaymentId())
.setCustomerId(orderEventPayload.getCustomerId())
.setOrderId(orderEventPayload.getOrderId())
.setPrice(orderEventPayload.getPrice())
.setCreatedAt(orderEventPayload.getCreatedAt().toInstant())//??
.setPaymentStatus(PaymentStatus.valueOf(orderEventPayload.getPaymentStatus()))
.setFailureMessages(orderEventPayload.getFailureMessages())
.build();
}
}

View File

@@ -1,47 +0,0 @@
package com.food.order.system.payment.messaging.publisher.kafka;
import com.food.order.system.event.publisher.DomainEventPublisher;
import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel;
import com.food.order.system.kafka.producer.KafkaMessageHelper;
import com.food.order.system.kafka.producer.service.KafkaProducer;
import com.food.order.system.payment.application.service.config.PaymentServiceConfigData;
import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper;
import com.food.order.system.payment.service.domain.event.PaymentCancelledEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
public class PaymentCancelledKafkaMessagePublisher implements DomainEventPublisher<PaymentCancelledEvent> {
private final PaymentMessagingDataMapper paymentDataMapper;
private final KafkaProducer<String , PaymentResponseAvroModel> kafkaProducer;
private final PaymentServiceConfigData paymentServiceConfigData;
private final KafkaMessageHelper kafkaMessageHelper;
@Override
public void publish(PaymentCancelledEvent event) {
log.info("Publishing payment cancelled event to kafka");
var orderId = event.getPayment().getOrderId().getValue().toString();
try {
var paymentResponseAvroModel =
paymentDataMapper.paymentCancelEventToPaymentResponseAvroModel(event);
kafkaProducer.send(paymentServiceConfigData.getPaymentResponseTopicName(),
orderId,
paymentResponseAvroModel,
kafkaMessageHelper.getKafkaCallBack(
paymentServiceConfigData.getPaymentResponseTopicName(),
paymentResponseAvroModel,
orderId,
"PaymentResponseAvroModel"));
log.info("Published payment cancelled event to kafka");
} catch (Exception e) {
log.error("Error while publishing payment cancelled event to kafka", e);
}
}
}

View File

@@ -1,48 +0,0 @@
package com.food.order.system.payment.messaging.publisher.kafka;
import com.food.order.system.event.publisher.DomainEventPublisher;
import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel;
import com.food.order.system.kafka.producer.KafkaMessageHelper;
import com.food.order.system.kafka.producer.service.KafkaProducer;
import com.food.order.system.payment.application.service.config.PaymentServiceConfigData;
import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper;
import com.food.order.system.payment.service.domain.event.PaymentCompletedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
public class PaymentCompletedKafkaMessagePublisher implements DomainEventPublisher<PaymentCompletedEvent> {
private final PaymentMessagingDataMapper paymentDataMapper;
private final KafkaProducer<String , PaymentResponseAvroModel> kafkaProducer;
private final PaymentServiceConfigData paymentServiceConfigData;
private final KafkaMessageHelper kafkaMessageHelper;
@Override
public void publish(PaymentCompletedEvent event) {
log.info("Publishing payment completed event to kafka");
var orderId = event.getPayment().getOrderId().getValue().toString();
try {
var paymentResponseAvroModel =
paymentDataMapper.paymentCompletedEventToPaymentResponseAvroModel(event);
kafkaProducer.send(paymentServiceConfigData.getPaymentResponseTopicName(),
orderId,
paymentResponseAvroModel,
kafkaMessageHelper.getKafkaCallBack(
paymentServiceConfigData.getPaymentResponseTopicName(),
paymentResponseAvroModel,
orderId,
"PaymentResponseAvroModel"));
log.info("Published payment completed event to kafka");
} catch (Exception e) {
log.error("Error while publishing payment completed event to kafka", e);
}
}
}

View File

@@ -0,0 +1,61 @@
package com.food.order.system.payment.messaging.publisher.kafka;
import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel;
import com.food.order.system.kafka.producer.KafkaMessageHelper;
import com.food.order.system.kafka.producer.service.KafkaProducer;
import com.food.order.system.outbox.OutboxStatus;
import com.food.order.system.payment.application.service.config.PaymentServiceConfigData;
import com.food.order.system.payment.application.service.outbox.model.OrderEventPayload;
import com.food.order.system.payment.application.service.outbox.model.OrderOutboxMessage;
import com.food.order.system.payment.application.service.ports.output.message.publisher.PaymentResponseMessagePublisher;
import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.function.BiConsumer;
@Component
@Slf4j
@RequiredArgsConstructor
public class PaymentEventKafkaPublisher implements PaymentResponseMessagePublisher {
private final PaymentMessagingDataMapper paymentMessagingDataMapper;
private final KafkaProducer<String , PaymentResponseAvroModel> kafkaProducer;
private final PaymentServiceConfigData paymentServiceConfigData;
private final KafkaMessageHelper kafkaMessageHelper;
@Override
public void publish(OrderOutboxMessage message,
BiConsumer<OrderOutboxMessage, OutboxStatus> outboxCallback) {
var payload =
kafkaMessageHelper.getOrderEventPayload(message.getPayload(), OrderEventPayload.class);
var sagaId = message.getSagaId().toString();
log.info("Publishing payment response for order id: {}", sagaId);
try {
var paymentResponseAvroModel =
paymentMessagingDataMapper.orderEventPayloadToPaymentResponseAvroModel(sagaId,payload);
kafkaProducer.send(paymentServiceConfigData.getPaymentResponseTopicName(),
sagaId, paymentResponseAvroModel,
kafkaMessageHelper.getKafkaCallback(
paymentServiceConfigData.getPaymentResponseTopicName(),
paymentResponseAvroModel,
message,
outboxCallback,
payload.getOrderId(),
"PaymentResponseAvroModel"
));
log.info("PaymentResponseAvroModel sent to kafka for order id: {} and saga id: {}",
paymentResponseAvroModel.getOrderId(), sagaId);
}
catch (Exception e) {
log.error("Error while publishing payment response for order id: {}", sagaId, e);
}
}
}

View File

@@ -1,48 +0,0 @@
package com.food.order.system.payment.messaging.publisher.kafka;
import com.food.order.system.event.publisher.DomainEventPublisher;
import com.food.order.system.kafka.order.avro.model.PaymentResponseAvroModel;
import com.food.order.system.kafka.producer.KafkaMessageHelper;
import com.food.order.system.kafka.producer.service.KafkaProducer;
import com.food.order.system.payment.application.service.config.PaymentServiceConfigData;
import com.food.order.system.payment.messaging.mapper.PaymentMessagingDataMapper;
import com.food.order.system.payment.service.domain.event.PaymentFailedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
public class PaymentFailedKafkaMessagePublisher implements DomainEventPublisher<PaymentFailedEvent> {
private final PaymentMessagingDataMapper paymentDataMapper;
private final KafkaProducer<String , PaymentResponseAvroModel> kafkaProducer;
private final PaymentServiceConfigData paymentServiceConfigData;
private final KafkaMessageHelper kafkaMessageHelper;
@Override
public void publish(PaymentFailedEvent event) {
log.info("Publishing payment failed event to kafka");
var orderId = event.getPayment().getOrderId().getValue().toString();
try {
var paymentResponseAvroModel =
paymentDataMapper.paymentFailedEventToPaymentResponseAvroModel(event);
kafkaProducer.send(paymentServiceConfigData.getPaymentResponseTopicName(),
orderId,
paymentResponseAvroModel,
kafkaMessageHelper.getKafkaCallBack(
paymentServiceConfigData.getPaymentResponseTopicName(),
paymentResponseAvroModel,
orderId,
"PaymentResponseAvroModel"));
log.info("Published payment failed event to kafka");
} catch (Exception e) {
log.error("Error while publishing payment failed event to kafka", e);
}
}
}