diff --git a/apache-kafka/pom.xml b/apache-kafka/pom.xml index cda91ed92f..8003743f95 100644 --- a/apache-kafka/pom.xml +++ b/apache-kafka/pom.xml @@ -161,6 +161,12 @@ spark-cassandra-connector-java_2.11 ${com.datastax.spark.spark-cassandra-connector-java.version} + + org.projectlombok + lombok + ${lombok.version} + provided + @@ -175,6 +181,7 @@ 0.8.1-spark3.0-s_2.12 2.5.2 1.6.0-M1 + 1.18.20 \ No newline at end of file diff --git a/apache-kafka/src/main/java/com/baeldung/kafka/dto/MessageDto.java b/apache-kafka/src/main/java/com/baeldung/kafka/dto/MessageDto.java new file mode 100644 index 0000000000..7f9e206358 --- /dev/null +++ b/apache-kafka/src/main/java/com/baeldung/kafka/dto/MessageDto.java @@ -0,0 +1,15 @@ +package com.baeldung.kafka.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +public class MessageDto { + private String message; + private String version; +} diff --git a/apache-kafka/src/main/java/com/baeldung/kafka/serdes/CustomDeserializer.java b/apache-kafka/src/main/java/com/baeldung/kafka/serdes/CustomDeserializer.java new file mode 100644 index 0000000000..ee6e79dcd1 --- /dev/null +++ b/apache-kafka/src/main/java/com/baeldung/kafka/serdes/CustomDeserializer.java @@ -0,0 +1,35 @@ +package com.baeldung.kafka.serdes; + +import com.baeldung.kafka.dto.MessageDto; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; + +public class CustomDeserializer implements Deserializer { + + private ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public MessageDto deserialize(String topic, byte[] data) { + try { + if (data == null){ + System.out.println("Null received at deserializing"); + return null; + } + System.out.println("Deserializing..."); + return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class); + } catch (Exception e) { + throw new SerializationException("Error when deserializing byte[] to MessageDto"); + } + } + + @Override + public void close() { + } +} diff --git a/apache-kafka/src/main/java/com/baeldung/kafka/serdes/CustomSerializer.java b/apache-kafka/src/main/java/com/baeldung/kafka/serdes/CustomSerializer.java new file mode 100644 index 0000000000..a414ad8e23 --- /dev/null +++ b/apache-kafka/src/main/java/com/baeldung/kafka/serdes/CustomSerializer.java @@ -0,0 +1,34 @@ +package com.baeldung.kafka.serdes; + +import com.baeldung.kafka.dto.MessageDto; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; + +public class CustomSerializer implements Serializer { + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public byte[] serialize(String topic, MessageDto data) { + try { + if (data == null){ + System.out.println("Null received at serializing"); + return null; + } + System.out.println("Serializing..."); + return objectMapper.writeValueAsBytes(data); + } catch (Exception e) { + throw new SerializationException("Error when serializing MessageDto to byte[]"); + } + } + + @Override + public void close() { + } +} diff --git a/apache-kafka/src/test/java/com/baeldung/kafka/serdes/KafkaSerDesLiveTest.java b/apache-kafka/src/test/java/com/baeldung/kafka/serdes/KafkaSerDesLiveTest.java new file mode 100644 index 0000000000..67fff12f5e --- /dev/null +++ b/apache-kafka/src/test/java/com/baeldung/kafka/serdes/KafkaSerDesLiveTest.java @@ -0,0 +1,92 @@ +package com.baeldung.kafka.serdes; + +import com.baeldung.kafka.dto.MessageDto; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; + +public class KafkaSerDesLiveTest { + private static final String CONSUMER_APP_ID = "consumer_id"; + private static final String CONSUMER_GROUP_ID = "group_id"; + + @ClassRule + public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); + private final String TOPIC = "mytopic"; + + private static KafkaConsumer createKafkaConsumer() { + + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID); + props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomDeserializer"); + + return new KafkaConsumer<>(props); + + } + + private static KafkaProducer createKafkaProducer() { + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomSerializer"); + + return new KafkaProducer(props); + + } + + @Before + public void setUp() { + } + + @Test + public void givenKafkaClientShouldSerializeAndDeserialize() throws InterruptedException { + + MessageDto msgProd = MessageDto.builder().message("test").version("1.0").build(); + + KafkaProducer producer = createKafkaProducer(); + producer.send(new ProducerRecord(TOPIC, "1", msgProd)); + System.out.println("Message sent " + msgProd); + producer.close(); + + Thread.sleep(2000); + + AtomicReference msgCons = new AtomicReference<>(); + + KafkaConsumer consumer = createKafkaConsumer(); + consumer.subscribe(Arrays.asList(TOPIC)); + + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + records.forEach(record -> { + msgCons.set(record.value()); + System.out.println("Message received " + record.value()); + }); + + consumer.close(); + + assertEquals(msgProd, msgCons.get()); + + } + +} + +