diff --git a/apache-kafka/src/main/java/com/baeldung/flink/FlinkDataPipeline.java b/apache-kafka/src/main/java/com/baeldung/flink/FlinkDataPipeline.java new file mode 100644 index 0000000000..4502b628b2 --- /dev/null +++ b/apache-kafka/src/main/java/com/baeldung/flink/FlinkDataPipeline.java @@ -0,0 +1,70 @@ +package com.baeldung.flink; + +import com.baeldung.flink.model.Backup; +import com.baeldung.flink.model.InputMessage; +import com.baeldung.flink.operator.BackupAggregator; +import com.baeldung.flink.operator.InputMessageTimestampAssigner; +import com.baeldung.flink.operator.WordsCapitalizer; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; + +import static com.baeldung.flink.connector.Consumers.*; +import static com.baeldung.flink.connector.Producers.*; + +public class FlinkDataPipeline { + + public static void capitalize() throws Exception { + String inputTopic = "flink_input"; + String outputTopic = "flink_output"; + String consumerGroup = "baeldung"; + String address = "localhost:9092"; + + StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + FlinkKafkaConsumer011 flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup); + flinkKafkaConsumer.setStartFromEarliest(); + + DataStream stringInputStream = environment.addSource(flinkKafkaConsumer); + + FlinkKafkaProducer011 flinkKafkaProducer = createStringProducer(outputTopic, address); + + stringInputStream.map(new WordsCapitalizer()) + .addSink(flinkKafkaProducer); + + environment.execute(); + } + + public static void createBackup() throws Exception { + String inputTopic = "flink_input"; + String outputTopic = "flink_output"; + String consumerGroup = "baeldung"; + String kafkaAddress = "localhost:9092"; + + StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + FlinkKafkaConsumer011 flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup); + flinkKafkaConsumer.setStartFromEarliest(); + + flinkKafkaConsumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner()); + FlinkKafkaProducer011 flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress); + + DataStream inputMessagesStream = environment.addSource(flinkKafkaConsumer); + + inputMessagesStream.timeWindowAll(Time.hours(24)) + .aggregate(new BackupAggregator()) + .addSink(flinkKafkaProducer); + + environment.execute(); + } + + public static void main(String[] args) throws Exception { + createBackup(); + } + +} diff --git a/libraries-data-2/src/main/java/com/baeldung/flink/connector/Consumers.java b/apache-kafka/src/main/java/com/baeldung/flink/connector/Consumers.java similarity index 53% rename from libraries-data-2/src/main/java/com/baeldung/flink/connector/Consumers.java rename to apache-kafka/src/main/java/com/baeldung/flink/connector/Consumers.java index 514085f9c4..c72cb8a2d6 100644 --- a/libraries-data-2/src/main/java/com/baeldung/flink/connector/Consumers.java +++ b/apache-kafka/src/main/java/com/baeldung/flink/connector/Consumers.java @@ -9,23 +9,20 @@ import java.util.Properties; public class Consumers { -public static FlinkKafkaConsumer011 createStringConsumerForTopic( - String topic, String kafkaAddress, String kafkaGroup ) { - Properties props = new Properties(); - props.setProperty("bootstrap.servers", kafkaAddress); - props.setProperty("group.id",kafkaGroup); - FlinkKafkaConsumer011 consumer = - new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props); + public static FlinkKafkaConsumer011 createStringConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) { + Properties props = new Properties(); + props.setProperty("bootstrap.servers", kafkaAddress); + props.setProperty("group.id", kafkaGroup); + FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props); - return consumer; -} + return consumer; + } - public static FlinkKafkaConsumer011 createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) { + public static FlinkKafkaConsumer011 createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup) { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", kafkaAddress); - properties.setProperty("group.id",kafkaGroup); - FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011( - topic, new InputMessageDeserializationSchema(),properties); + properties.setProperty("group.id", kafkaGroup); + FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(topic, new InputMessageDeserializationSchema(), properties); return consumer; } diff --git a/libraries-data-2/src/main/java/com/baeldung/flink/connector/Producers.java b/apache-kafka/src/main/java/com/baeldung/flink/connector/Producers.java similarity index 100% rename from libraries-data-2/src/main/java/com/baeldung/flink/connector/Producers.java rename to apache-kafka/src/main/java/com/baeldung/flink/connector/Producers.java diff --git a/libraries-data-2/src/main/java/com/baeldung/flink/model/Backup.java b/apache-kafka/src/main/java/com/baeldung/flink/model/Backup.java similarity index 100% rename from libraries-data-2/src/main/java/com/baeldung/flink/model/Backup.java rename to apache-kafka/src/main/java/com/baeldung/flink/model/Backup.java diff --git a/libraries-data-2/src/main/java/com/baeldung/flink/model/InputMessage.java b/apache-kafka/src/main/java/com/baeldung/flink/model/InputMessage.java similarity index 80% rename from libraries-data-2/src/main/java/com/baeldung/flink/model/InputMessage.java rename to apache-kafka/src/main/java/com/baeldung/flink/model/InputMessage.java index b3f75256ae..9331811b91 100644 --- a/libraries-data-2/src/main/java/com/baeldung/flink/model/InputMessage.java +++ b/apache-kafka/src/main/java/com/baeldung/flink/model/InputMessage.java @@ -18,6 +18,7 @@ public class InputMessage { public String getSender() { return sender; } + public void setSender(String sender) { this.sender = sender; } @@ -55,13 +56,12 @@ public class InputMessage { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; InputMessage message1 = (InputMessage) o; - return Objects.equal(sender, message1.sender) && - Objects.equal(recipient, message1.recipient) && - Objects.equal(sentAt, message1.sentAt) && - Objects.equal(message, message1.message); + return Objects.equal(sender, message1.sender) && Objects.equal(recipient, message1.recipient) && Objects.equal(sentAt, message1.sentAt) && Objects.equal(message, message1.message); } @Override diff --git a/apache-kafka/src/main/java/com/baeldung/flink/operator/BackupAggregator.java b/apache-kafka/src/main/java/com/baeldung/flink/operator/BackupAggregator.java new file mode 100644 index 0000000000..bac1c8c705 --- /dev/null +++ b/apache-kafka/src/main/java/com/baeldung/flink/operator/BackupAggregator.java @@ -0,0 +1,34 @@ +package com.baeldung.flink.operator; + +import com.baeldung.flink.model.Backup; +import com.baeldung.flink.model.InputMessage; +import org.apache.flink.api.common.functions.AggregateFunction; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +public class BackupAggregator implements AggregateFunction, Backup> { + @Override + public List createAccumulator() { + return new ArrayList<>(); + } + + @Override + public List add(InputMessage inputMessage, List inputMessages) { + inputMessages.add(inputMessage); + return inputMessages; + } + + @Override + public Backup getResult(List inputMessages) { + Backup backup = new Backup(inputMessages, LocalDateTime.now()); + return backup; + } + + @Override + public List merge(List inputMessages, List acc1) { + inputMessages.addAll(acc1); + return inputMessages; + } +} diff --git a/libraries-data-2/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java b/apache-kafka/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java similarity index 88% rename from libraries-data-2/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java rename to apache-kafka/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java index 05828d9588..995fe41717 100644 --- a/libraries-data-2/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java +++ b/apache-kafka/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java @@ -12,7 +12,9 @@ public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWate @Override public long extractTimestamp(InputMessage element, long previousElementTimestamp) { ZoneId zoneId = ZoneId.systemDefault(); - return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000; + return element.getSentAt() + .atZone(zoneId) + .toEpochSecond() * 1000; } @Nullable diff --git a/libraries-data-2/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java b/apache-kafka/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java similarity index 100% rename from libraries-data-2/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java rename to apache-kafka/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java diff --git a/libraries-data-2/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java b/apache-kafka/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java similarity index 90% rename from libraries-data-2/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java rename to apache-kafka/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java index 967b266bb6..d4b7b0955a 100644 --- a/libraries-data-2/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java +++ b/apache-kafka/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java @@ -9,8 +9,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BackupSerializationSchema - implements SerializationSchema { +public class BackupSerializationSchema implements SerializationSchema { static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); @@ -18,7 +17,7 @@ public class BackupSerializationSchema @Override public byte[] serialize(Backup backupMessage) { - if(objectMapper == null) { + if (objectMapper == null) { objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); } diff --git a/libraries-data-2/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java b/apache-kafka/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java similarity index 89% rename from libraries-data-2/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java rename to apache-kafka/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java index 9aaf8b9877..e521af7c2d 100644 --- a/libraries-data-2/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java +++ b/apache-kafka/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java @@ -8,12 +8,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import java.io.IOException; -public class InputMessageDeserializationSchema implements - DeserializationSchema { +public class InputMessageDeserializationSchema implements DeserializationSchema { static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); - @Override public InputMessage deserialize(byte[] bytes) throws IOException { diff --git a/libraries-data-2/src/main/java/com/baeldung/kafka/consumer/CountryPopulation.java b/apache-kafka/src/main/java/com/baeldung/kafka/consumer/CountryPopulation.java similarity index 100% rename from libraries-data-2/src/main/java/com/baeldung/kafka/consumer/CountryPopulation.java rename to apache-kafka/src/main/java/com/baeldung/kafka/consumer/CountryPopulation.java diff --git a/libraries-data-2/src/main/java/com/baeldung/kafka/consumer/CountryPopulationConsumer.java b/apache-kafka/src/main/java/com/baeldung/kafka/consumer/CountryPopulationConsumer.java similarity index 89% rename from libraries-data-2/src/main/java/com/baeldung/kafka/consumer/CountryPopulationConsumer.java rename to apache-kafka/src/main/java/com/baeldung/kafka/consumer/CountryPopulationConsumer.java index ba4dfe6f3b..a67d3a581c 100644 --- a/libraries-data-2/src/main/java/com/baeldung/kafka/consumer/CountryPopulationConsumer.java +++ b/apache-kafka/src/main/java/com/baeldung/kafka/consumer/CountryPopulationConsumer.java @@ -19,9 +19,7 @@ public class CountryPopulationConsumer { private java.util.function.Consumer exceptionConsumer; private java.util.function.Consumer countryPopulationConsumer; - public CountryPopulationConsumer( - Consumer consumer, java.util.function.Consumer exceptionConsumer, - java.util.function.Consumer countryPopulationConsumer) { + public CountryPopulationConsumer(Consumer consumer, java.util.function.Consumer exceptionConsumer, java.util.function.Consumer countryPopulationConsumer) { this.consumer = consumer; this.exceptionConsumer = exceptionConsumer; this.countryPopulationConsumer = countryPopulationConsumer; diff --git a/libraries-data-2/src/main/java/com/baeldung/kafka/producer/EvenOddPartitioner.java b/apache-kafka/src/main/java/com/baeldung/kafka/producer/EvenOddPartitioner.java similarity index 100% rename from libraries-data-2/src/main/java/com/baeldung/kafka/producer/EvenOddPartitioner.java rename to apache-kafka/src/main/java/com/baeldung/kafka/producer/EvenOddPartitioner.java diff --git a/libraries-data-2/src/main/java/com/baeldung/kafka/producer/KafkaProducer.java b/apache-kafka/src/main/java/com/baeldung/kafka/producer/KafkaProducer.java similarity index 95% rename from libraries-data-2/src/main/java/com/baeldung/kafka/producer/KafkaProducer.java rename to apache-kafka/src/main/java/com/baeldung/kafka/producer/KafkaProducer.java index 911c9ed3d7..fa508593e0 100644 --- a/libraries-data-2/src/main/java/com/baeldung/kafka/producer/KafkaProducer.java +++ b/apache-kafka/src/main/java/com/baeldung/kafka/producer/KafkaProducer.java @@ -15,8 +15,7 @@ public class KafkaProducer { } public Future send(String key, String value) { - ProducerRecord record = new ProducerRecord("topic_sports_news", - key, value); + ProducerRecord record = new ProducerRecord("topic_sports_news", key, value); return producer.send(record); } @@ -36,5 +35,4 @@ public class KafkaProducer { producer.commitTransaction(); } - } diff --git a/libraries-data-2/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java b/apache-kafka/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java similarity index 100% rename from libraries-data-2/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java rename to apache-kafka/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java diff --git a/libraries-data-2/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java b/apache-kafka/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java similarity index 100% rename from libraries-data-2/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java rename to apache-kafka/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java diff --git a/libraries-data-2/src/test/java/com/baeldung/kafka/consumer/CountryPopulationConsumerUnitTest.java b/apache-kafka/src/test/java/com/baeldung/kafka/consumer/CountryPopulationConsumerUnitTest.java similarity index 100% rename from libraries-data-2/src/test/java/com/baeldung/kafka/consumer/CountryPopulationConsumerUnitTest.java rename to apache-kafka/src/test/java/com/baeldung/kafka/consumer/CountryPopulationConsumerUnitTest.java diff --git a/libraries-data-2/src/test/java/com/baeldung/kafka/producer/KafkaProducerUnitTest.java b/apache-kafka/src/test/java/com/baeldung/kafka/producer/KafkaProducerUnitTest.java similarity index 100% rename from libraries-data-2/src/test/java/com/baeldung/kafka/producer/KafkaProducerUnitTest.java rename to apache-kafka/src/test/java/com/baeldung/kafka/producer/KafkaProducerUnitTest.java diff --git a/libraries-data-2/src/main/java/com/baeldung/flink/FlinkDataPipeline.java b/libraries-data-2/src/main/java/com/baeldung/flink/FlinkDataPipeline.java deleted file mode 100644 index d02b1bcb83..0000000000 --- a/libraries-data-2/src/main/java/com/baeldung/flink/FlinkDataPipeline.java +++ /dev/null @@ -1,82 +0,0 @@ -package com.baeldung.flink; - - -import com.baeldung.flink.model.Backup; -import com.baeldung.flink.model.InputMessage; -import com.baeldung.flink.operator.BackupAggregator; -import com.baeldung.flink.operator.InputMessageTimestampAssigner; -import com.baeldung.flink.operator.WordsCapitalizer; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; - -import static com.baeldung.flink.connector.Consumers.*; -import static com.baeldung.flink.connector.Producers.*; - -public class FlinkDataPipeline { - - public static void capitalize() throws Exception { - String inputTopic = "flink_input"; - String outputTopic = "flink_output"; - String consumerGroup = "baeldung"; - String address = "localhost:9092"; - - StreamExecutionEnvironment environment = - StreamExecutionEnvironment.getExecutionEnvironment(); - - FlinkKafkaConsumer011 flinkKafkaConsumer = - createStringConsumerForTopic(inputTopic, address, consumerGroup); - flinkKafkaConsumer.setStartFromEarliest(); - - DataStream stringInputStream = - environment.addSource(flinkKafkaConsumer); - - FlinkKafkaProducer011 flinkKafkaProducer = - createStringProducer(outputTopic, address); - - stringInputStream - .map(new WordsCapitalizer()) - .addSink(flinkKafkaProducer); - - environment.execute(); - } - -public static void createBackup () throws Exception { - String inputTopic = "flink_input"; - String outputTopic = "flink_output"; - String consumerGroup = "baeldung"; - String kafkaAddress = "localhost:9092"; - - StreamExecutionEnvironment environment = - StreamExecutionEnvironment.getExecutionEnvironment(); - - environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - - FlinkKafkaConsumer011 flinkKafkaConsumer = - createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup); - flinkKafkaConsumer.setStartFromEarliest(); - - flinkKafkaConsumer - .assignTimestampsAndWatermarks(new InputMessageTimestampAssigner()); - FlinkKafkaProducer011 flinkKafkaProducer = - createBackupProducer(outputTopic, kafkaAddress); - - DataStream inputMessagesStream = - environment.addSource(flinkKafkaConsumer); - - inputMessagesStream - .timeWindowAll(Time.hours(24)) - .aggregate(new BackupAggregator()) - .addSink(flinkKafkaProducer); - - environment.execute(); -} - - public static void main(String[] args) throws Exception { - createBackup(); - } - -} diff --git a/libraries-data-2/src/main/java/com/baeldung/flink/operator/BackupAggregator.java b/libraries-data-2/src/main/java/com/baeldung/flink/operator/BackupAggregator.java deleted file mode 100644 index c39b8413d1..0000000000 --- a/libraries-data-2/src/main/java/com/baeldung/flink/operator/BackupAggregator.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.baeldung.flink.operator; - -import com.baeldung.flink.model.Backup; -import com.baeldung.flink.model.InputMessage; -import org.apache.flink.api.common.functions.AggregateFunction; - -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; - - public class BackupAggregator implements AggregateFunction, Backup> { - @Override - public List createAccumulator() { - return new ArrayList<>(); - } - - @Override - public List add(InputMessage inputMessage, List inputMessages) { - inputMessages.add(inputMessage); - return inputMessages; - } - - @Override - public Backup getResult(List inputMessages) { - Backup backup = new Backup(inputMessages, LocalDateTime.now()); - return backup; - } - - @Override - public List merge(List inputMessages, List acc1) { - inputMessages.addAll(acc1); - return inputMessages; - } - }