JAVA-6390: Move kafka articles from libraries-data-3 to new module

apache-kafka
This commit is contained in:
sampadawagde
2021-08-15 17:09:04 +05:30
parent 6bf1b587bd
commit cf2e02defa
20 changed files with 128 additions and 148 deletions

View File

@@ -1,82 +0,0 @@
package com.baeldung.flink;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import com.baeldung.flink.operator.BackupAggregator;
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
import com.baeldung.flink.operator.WordsCapitalizer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import static com.baeldung.flink.connector.Consumers.*;
import static com.baeldung.flink.connector.Producers.*;
public class FlinkDataPipeline {
public static void capitalize() throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String address = "localhost:9092";
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<String> flinkKafkaConsumer =
createStringConsumerForTopic(inputTopic, address, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
DataStream<String> stringInputStream =
environment.addSource(flinkKafkaConsumer);
FlinkKafkaProducer011<String> flinkKafkaProducer =
createStringProducer(outputTopic, address);
stringInputStream
.map(new WordsCapitalizer())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void createBackup () throws Exception {
String inputTopic = "flink_input";
String outputTopic = "flink_output";
String consumerGroup = "baeldung";
String kafkaAddress = "localhost:9092";
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer =
createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
flinkKafkaConsumer.setStartFromEarliest();
flinkKafkaConsumer
.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
FlinkKafkaProducer011<Backup> flinkKafkaProducer =
createBackupProducer(outputTopic, kafkaAddress);
DataStream<InputMessage> inputMessagesStream =
environment.addSource(flinkKafkaConsumer);
inputMessagesStream
.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(flinkKafkaProducer);
environment.execute();
}
public static void main(String[] args) throws Exception {
createBackup();
}
}

View File

@@ -1,32 +0,0 @@
package com.baeldung.flink.connector;
import com.baeldung.flink.model.InputMessage;
import com.baeldung.flink.schema.InputMessageDeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class Consumers {
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup ) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<String> consumer =
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
return consumer;
}
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaAddress);
properties.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(
topic, new InputMessageDeserializationSchema(),properties);
return consumer;
}
}

View File

@@ -1,17 +0,0 @@
package com.baeldung.flink.connector;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.schema.BackupSerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
public class Producers {
public static FlinkKafkaProducer011<String> createStringProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer011<>(kafkaAddress, topic, new SimpleStringSchema());
}
public static FlinkKafkaProducer011<Backup> createBackupProducer(String topic, String kafkaAddress) {
return new FlinkKafkaProducer011<Backup>(kafkaAddress, topic, new BackupSerializationSchema());
}
}

View File

@@ -1,27 +0,0 @@
package com.baeldung.flink.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
public class Backup {
@JsonProperty("inputMessages")
List<InputMessage> inputMessages;
@JsonProperty("backupTimestamp")
LocalDateTime backupTimestamp;
@JsonProperty("uuid")
UUID uuid;
public Backup(List<InputMessage> inputMessages, LocalDateTime backupTimestamp) {
this.inputMessages = inputMessages;
this.backupTimestamp = backupTimestamp;
this.uuid = UUID.randomUUID();
}
public List<InputMessage> getInputMessages() {
return inputMessages;
}
}

View File

@@ -1,71 +0,0 @@
package com.baeldung.flink.model;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.base.Objects;
import java.time.LocalDateTime;
@JsonSerialize
public class InputMessage {
String sender;
String recipient;
LocalDateTime sentAt;
String message;
public InputMessage() {
}
public String getSender() {
return sender;
}
public void setSender(String sender) {
this.sender = sender;
}
public String getRecipient() {
return recipient;
}
public void setRecipient(String recipient) {
this.recipient = recipient;
}
public LocalDateTime getSentAt() {
return sentAt;
}
public void setSentAt(LocalDateTime sentAt) {
this.sentAt = sentAt;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public InputMessage(String sender, String recipient, LocalDateTime sentAt, String message) {
this.sender = sender;
this.recipient = recipient;
this.sentAt = sentAt;
this.message = message;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
InputMessage message1 = (InputMessage) o;
return Objects.equal(sender, message1.sender) &&
Objects.equal(recipient, message1.recipient) &&
Objects.equal(sentAt, message1.sentAt) &&
Objects.equal(message, message1.message);
}
@Override
public int hashCode() {
return Objects.hashCode(sender, recipient, sentAt, message);
}
}

View File

@@ -1,34 +0,0 @@
package com.baeldung.flink.operator;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import org.apache.flink.api.common.functions.AggregateFunction;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
@Override
public List<InputMessage> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
inputMessages.add(inputMessage);
return inputMessages;
}
@Override
public Backup getResult(List<InputMessage> inputMessages) {
Backup backup = new Backup(inputMessages, LocalDateTime.now());
return backup;
}
@Override
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
inputMessages.addAll(acc1);
return inputMessages;
}
}

View File

@@ -1,23 +0,0 @@
package com.baeldung.flink.operator;
import com.baeldung.flink.model.InputMessage;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
import java.time.ZoneId;
public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWatermarks<InputMessage> {
@Override
public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
ZoneId zoneId = ZoneId.systemDefault();
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
}
@Nullable
@Override
public Watermark checkAndGetNextWatermark(InputMessage lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp - 15);
}
}

View File

@@ -1,11 +0,0 @@
package com.baeldung.flink.operator;
import org.apache.flink.api.common.functions.MapFunction;
public class WordsCapitalizer implements MapFunction<String, String> {
@Override
public String map(String s) {
return s.toUpperCase();
}
}

View File

@@ -1,33 +0,0 @@
package com.baeldung.flink.schema;
import com.baeldung.flink.model.Backup;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BackupSerializationSchema
implements SerializationSchema<Backup> {
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);
@Override
public byte[] serialize(Backup backupMessage) {
if(objectMapper == null) {
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
}
try {
String json = objectMapper.writeValueAsString(backupMessage);
return json.getBytes();
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
logger.error("Failed to parse JSON", e);
}
return new byte[0];
}
}

View File

@@ -1,32 +0,0 @@
package com.baeldung.flink.schema;
import com.baeldung.flink.model.InputMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.IOException;
public class InputMessageDeserializationSchema implements
DeserializationSchema<InputMessage> {
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
@Override
public InputMessage deserialize(byte[] bytes) throws IOException {
return objectMapper.readValue(bytes, InputMessage.class);
}
@Override
public boolean isEndOfStream(InputMessage inputMessage) {
return false;
}
@Override
public TypeInformation<InputMessage> getProducedType() {
return TypeInformation.of(InputMessage.class);
}
}

View File

@@ -1,28 +0,0 @@
package com.baeldung.kafka.consumer;
class CountryPopulation {
private String country;
private Integer population;
public CountryPopulation(String country, Integer population) {
this.country = country;
this.population = population;
}
public String getCountry() {
return country;
}
public void setCountry(String country) {
this.country = country;
}
public Integer getPopulation() {
return population;
}
public void setPopulation(Integer population) {
this.population = population;
}
}

View File

@@ -1,60 +0,0 @@
package com.baeldung.kafka.consumer;
import java.time.Duration;
import java.util.Collections;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CountryPopulationConsumer {
private static Logger logger = LoggerFactory.getLogger(CountryPopulationConsumer.class);
private Consumer<String, Integer> consumer;
private java.util.function.Consumer<Throwable> exceptionConsumer;
private java.util.function.Consumer<CountryPopulation> countryPopulationConsumer;
public CountryPopulationConsumer(
Consumer<String, Integer> consumer, java.util.function.Consumer<Throwable> exceptionConsumer,
java.util.function.Consumer<CountryPopulation> countryPopulationConsumer) {
this.consumer = consumer;
this.exceptionConsumer = exceptionConsumer;
this.countryPopulationConsumer = countryPopulationConsumer;
}
void startBySubscribing(String topic) {
consume(() -> consumer.subscribe(Collections.singleton(topic)));
}
void startByAssigning(String topic, int partition) {
consume(() -> consumer.assign(Collections.singleton(new TopicPartition(topic, partition))));
}
private void consume(Runnable beforePollingTask) {
try {
beforePollingTask.run();
while (true) {
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(1000));
StreamSupport.stream(records.spliterator(), false)
.map(record -> new CountryPopulation(record.key(), record.value()))
.forEach(countryPopulationConsumer);
consumer.commitSync();
}
} catch (WakeupException e) {
logger.info("Shutting down...");
} catch (RuntimeException ex) {
exceptionConsumer.accept(ex);
} finally {
consumer.close();
}
}
public void stop() {
consumer.wakeup();
}
}

View File

@@ -1,17 +0,0 @@
package com.baeldung.kafka.producer;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
public class EvenOddPartitioner extends DefaultPartitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (((String) key).length() % 2 == 0) {
return 0;
}
return 1;
}
}

View File

@@ -1,40 +0,0 @@
package com.baeldung.kafka.producer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.concurrent.Future;
public class KafkaProducer {
private final Producer<String, String> producer;
public KafkaProducer(Producer<String, String> producer) {
this.producer = producer;
}
public Future<RecordMetadata> send(String key, String value) {
ProducerRecord record = new ProducerRecord("topic_sports_news",
key, value);
return producer.send(record);
}
public void flush() {
producer.flush();
}
public void beginTransaction() {
producer.beginTransaction();
}
public void initTransaction() {
producer.initTransactions();
}
public void commitTransaction() {
producer.commitTransaction();
}
}

View File

@@ -1,104 +0,0 @@
package com.baeldung.flink;
import com.baeldung.flink.model.Backup;
import com.baeldung.flink.model.InputMessage;
import com.baeldung.flink.operator.BackupAggregator;
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
import com.baeldung.flink.schema.BackupSerializationSchema;
import com.baeldung.flink.schema.InputMessageDeserializationSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.commons.collections.ListUtils;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public class BackupCreatorIntegrationTest {
public static ObjectMapper mapper;
@Before
public void setup() {
mapper = new ObjectMapper().registerModule(new JavaTimeModule());
}
@Test
public void givenProperJson_whenDeserializeIsInvoked_thenProperObjectIsReturned() throws IOException {
InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message");
byte[] messageSerialized = mapper.writeValueAsBytes(message);
DeserializationSchema<InputMessage> deserializationSchema = new InputMessageDeserializationSchema();
InputMessage messageDeserialized = deserializationSchema.deserialize(messageSerialized);
assertEquals(message, messageDeserialized);
}
@Test
public void givenMultipleInputMessagesFromDifferentDays_whenBackupCreatorIsUser_thenMessagesAreGroupedProperly() throws Exception {
LocalDateTime currentTime = LocalDateTime.now();
InputMessage message = new InputMessage("Me", "User", currentTime, "First TestMessage");
InputMessage secondMessage = new InputMessage("Me", "User", currentTime.plusHours(1), "First TestMessage");
InputMessage thirdMessage = new InputMessage("Me", "User", currentTime.plusHours(2), "First TestMessage");
InputMessage fourthMessage = new InputMessage("Me", "User", currentTime.plusHours(3), "First TestMessage");
InputMessage fifthMessage = new InputMessage("Me", "User", currentTime.plusHours(25), "First TestMessage");
InputMessage sixthMessage = new InputMessage("Me", "User", currentTime.plusHours(26), "First TestMessage");
List<InputMessage> firstBackupMessages = Arrays.asList(message, secondMessage, thirdMessage, fourthMessage);
List<InputMessage> secondBackupMessages = Arrays.asList(fifthMessage, sixthMessage);
List<InputMessage> inputMessages = ListUtils.union(firstBackupMessages, secondBackupMessages);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStreamSource<InputMessage> testDataSet = env.fromCollection(inputMessages);
CollectingSink sink = new CollectingSink();
testDataSet.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner())
.timeWindowAll(Time.hours(24))
.aggregate(new BackupAggregator())
.addSink(sink);
env.execute();
Awaitility.await().until(() -> sink.backups.size() == 2);
assertEquals(2, sink.backups.size());
assertEquals(firstBackupMessages, sink.backups.get(0).getInputMessages());
assertEquals(secondBackupMessages, sink.backups.get(1).getInputMessages());
}
@Test
public void givenProperBackupObject_whenSerializeIsInvoked_thenObjectIsProperlySerialized() throws IOException {
InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message");
List<InputMessage> messages = Arrays.asList(message);
Backup backup = new Backup(messages, LocalDateTime.now());
byte[] backupSerialized = mapper.writeValueAsBytes(backup);
SerializationSchema<Backup> serializationSchema = new BackupSerializationSchema();
byte[] backupProcessed = serializationSchema.serialize(backup);
assertArrayEquals(backupSerialized, backupProcessed);
}
private static class CollectingSink implements SinkFunction<Backup> {
public static List<Backup> backups = new ArrayList<>();
@Override
public synchronized void invoke(Backup value, Context context) throws Exception {
backups.add(value);
}
}
}

View File

@@ -1,34 +0,0 @@
package com.baeldung.flink;
import com.baeldung.flink.operator.WordsCapitalizer;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class WordCapitalizerIntegrationTest {
@Test
public void givenDataSet_whenExecuteWordCapitalizer_thenReturnCapitalizedWords() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List<String> data = Arrays.asList("dog", "cat", "wolf", "pig");
DataSet<String> testDataSet = env.fromCollection(data);
List<String> dataProcessed = testDataSet
.map(new WordsCapitalizer())
.collect();
List<String> testDataCapitalized = data.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
Assert.assertEquals(testDataCapitalized, dataProcessed);
}
}

View File

@@ -1,100 +0,0 @@
package com.baeldung.kafka.consumer;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class CountryPopulationConsumerUnitTest {
private static final String TOPIC = "topic";
private static final int PARTITION = 0;
private CountryPopulationConsumer countryPopulationConsumer;
private List<CountryPopulation> updates;
private Throwable pollException;
private MockConsumer<String, Integer> consumer;
@BeforeEach
void setUp() {
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
updates = new ArrayList<>();
countryPopulationConsumer = new CountryPopulationConsumer(consumer, ex -> this.pollException = ex, updates::add);
}
@Test
void whenStartingByAssigningTopicPartition_thenExpectUpdatesAreConsumedCorrectly() {
// GIVEN
consumer.schedulePollTask(() -> consumer.addRecord(record(TOPIC, PARTITION, "Romania", 19_410_000)));
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
startOffsets.put(tp, 0L);
consumer.updateBeginningOffsets(startOffsets);
// WHEN
countryPopulationConsumer.startByAssigning(TOPIC, PARTITION);
// THEN
assertThat(updates).hasSize(1);
assertThat(consumer.closed()).isTrue();
}
@Test
void whenStartingBySubscribingToTopic_thenExpectUpdatesAreConsumedCorrectly() {
// GIVEN
consumer.schedulePollTask(() -> {
consumer.rebalance(Collections.singletonList(new TopicPartition(TOPIC, 0)));
consumer.addRecord(record(TOPIC, PARTITION, "Romania", 19_410_000));
});
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
startOffsets.put(tp, 0L);
consumer.updateBeginningOffsets(startOffsets);
// WHEN
countryPopulationConsumer.startBySubscribing(TOPIC);
// THEN
assertThat(updates).hasSize(1);
assertThat(consumer.closed()).isTrue();
}
@Test
void whenStartingBySubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() {
// GIVEN
consumer.schedulePollTask(() -> consumer.setPollException(new KafkaException("poll exception")));
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
TopicPartition tp = new TopicPartition(TOPIC, 0);
startOffsets.put(tp, 0L);
consumer.updateBeginningOffsets(startOffsets);
// WHEN
countryPopulationConsumer.startBySubscribing(TOPIC);
// THEN
assertThat(pollException).isInstanceOf(KafkaException.class).hasMessage("poll exception");
assertThat(consumer.closed()).isTrue();
}
private ConsumerRecord<String, Integer> record(String topic, int partition, String country, int population) {
return new ConsumerRecord<>(topic, partition, 0, country, population);
}
}

View File

@@ -1,116 +0,0 @@
package com.baeldung.kafka.producer;
import com.baeldung.kafka.producer.EvenOddPartitioner;
import com.baeldung.kafka.producer.KafkaProducer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static java.util.Collections.emptySet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class KafkaProducerUnitTest {
private final String TOPIC_NAME = "topic_sports_news";
private KafkaProducer kafkaProducer;
private MockProducer<String, String> mockProducer;
private void buildMockProducer(boolean autoComplete) {
this.mockProducer = new MockProducer<>(autoComplete, new StringSerializer(), new StringSerializer());
}
@Test
void givenKeyValue_whenSend_thenVerifyHistory() throws ExecutionException, InterruptedException {
buildMockProducer(true);
//when
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");
//then
assertTrue(mockProducer.history().size() == 1);
assertTrue(mockProducer.history().get(0).key().equalsIgnoreCase("data"));
assertTrue(recordMetadataFuture.get().partition() == 0);
}
@Test
void givenKeyValue_whenSend_thenSendOnlyAfterFlush() {
buildMockProducer(false);
//when
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");
assertFalse(record.isDone());
//then
kafkaProducer.flush();
assertTrue(record.isDone());
}
@Test
void givenKeyValue_whenSend_thenReturnException() {
buildMockProducer(false);
//when
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> record = kafkaProducer.send("site", "{\"site\" : \"baeldung\"}");
RuntimeException e = new RuntimeException();
mockProducer.errorNext(e);
//then
try {
record.get();
} catch (ExecutionException | InterruptedException ex) {
assertEquals(e, ex.getCause());
}
assertTrue(record.isDone());
}
@Test
void givenKeyValue_whenSendWithTxn_thenSendOnlyOnTxnCommit() {
buildMockProducer(true);
//when
kafkaProducer = new KafkaProducer(mockProducer);
kafkaProducer.initTransaction();
kafkaProducer.beginTransaction();
Future<RecordMetadata> record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");
//then
assertTrue(mockProducer.history().isEmpty());
kafkaProducer.commitTransaction();
assertTrue(mockProducer.history().size() == 1);
}
@Test
void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber() throws ExecutionException, InterruptedException {
PartitionInfo partitionInfo0 = new PartitionInfo(TOPIC_NAME, 0, null, null, null);
PartitionInfo partitionInfo1 = new PartitionInfo(TOPIC_NAME, 1, null, null, null);
List<PartitionInfo> list = new ArrayList<>();
list.add(partitionInfo0);
list.add(partitionInfo1);
Cluster cluster = new Cluster("kafkab", new ArrayList<Node>(), list, emptySet(), emptySet());
this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(), new StringSerializer(), new StringSerializer());
//when
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("partition", "{\"site\" : \"baeldung\"}");
//then
assertTrue(recordMetadataFuture.get().partition() == 1);
}
}