Outbox Message and Scheduler class implemented part - 3.

This commit is contained in:
Ali CANLI
2022-07-16 14:01:49 +03:00
parent 3affe7ccdd
commit 9ff94746f6
153 changed files with 961 additions and 525 deletions

View File

@@ -41,6 +41,10 @@
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
</dependency>
<dependency>
<groupId>com.food.order</groupId>
<artifactId>outbox</artifactId>
</dependency>
</dependencies>

View File

@@ -1,34 +1,59 @@
package com.food.order.system.kafka.producer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.food.order.system.domain.exception.OrderDomainException;
import com.food.order.system.outbox.OutboxStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.function.BiConsumer;
@Component
@Slf4j
@RequiredArgsConstructor
public class KafkaMessageHelper {
public <T> ListenableFutureCallback<SendResult<String, T>> getKafkaCallBack
(String responseTopicName, T t,String orderId, String requestAvroModelName) {
return new ListenableFutureCallback<>() {
private final ObjectMapper objectMapper;
public <T, U> ListenableFutureCallback<SendResult<String, T>>
getKafkaCallback(String responseTopicName, T avroModel, U outboxMessage,
BiConsumer<U, OutboxStatus> outboxCallback,
String orderId, String avroModelName) {
return new ListenableFutureCallback<SendResult<String, T>>() {
@Override
public void onFailure(Throwable ex) {
log.error("Error while sending " + requestAvroModelName +
" to " + responseTopicName + " for orderId " + orderId, ex);
log.error("Error while sending {} with message: {} and outbox type: {} to topic {}",
avroModelName, avroModel.toString(), outboxMessage.getClass().getName(), responseTopicName, ex);
outboxCallback.accept(outboxMessage, OutboxStatus.FAILED);
}
@Override
public void onSuccess(SendResult<String, T> result) {
RecordMetadata recordMetadata = result.getRecordMetadata();
log.info("Received successful response from kafka for order id : {} Topic : {} Partition : {} , Offset : {} , Timestamp : {}",
RecordMetadata metadata = result.getRecordMetadata();
log.info("Received successful response from Kafka for order id: {}" +
" Topic: {} Partition: {} Offset: {} Timestamp: {}",
orderId,
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset(),
recordMetadata.timestamp());
metadata.topic(),
metadata.partition(),
metadata.offset(),
metadata.timestamp());
outboxCallback.accept(outboxMessage, OutboxStatus.COMPLETED);
}
};
}
public <T> T getOrderEventPayload(String payload, Class<T> outputType) {
try {
return objectMapper.readValue(payload, outputType);
} catch (JsonProcessingException e) {
log.error("Could not read {} object!", outputType.getName(), e);
throw new OrderDomainException("Could not read " + outputType.getName() + " object!", e);
}
}
}