diff --git a/libraries/pom.xml b/libraries/pom.xml
index 047b56dd01..d129844543 100644
--- a/libraries/pom.xml
+++ b/libraries/pom.xml
@@ -204,7 +204,7 @@
org.apache.flink
- flink-test-utils_2.10
+ flink-test-utils_2.11
${flink.version}
test
diff --git a/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java b/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java
index d4e4cec60b..423637bf53 100644
--- a/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java
+++ b/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java
@@ -1,10 +1,11 @@
-package com.baeldung;
+package com.baeldung.flink;
-import com.baeldung.flink.BackupAggregator;
-import com.baeldung.flink.InputMessageTimestampAssigner;
-import com.baeldung.flink.WordsCapitalizer;
-import com.baeldung.model.Backup;
-import com.baeldung.model.InputMessage;
+
+import com.baeldung.flink.model.Backup;
+import com.baeldung.flink.model.InputMessage;
+import com.baeldung.flink.operator.BackupAggregator;
+import com.baeldung.flink.operator.InputMessageTimestampAssigner;
+import com.baeldung.flink.operator.WordsCapitalizer;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -12,8 +13,8 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
-import static com.baeldung.flink.Consumers.*;
-import static com.baeldung.flink.Producers.*;
+import static com.baeldung.flink.connector.Consumers.*;
+import static com.baeldung.flink.connector.Producers.*;
public class FlinkDataPipeline {
diff --git a/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java b/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java
index e9aa78b677..514085f9c4 100644
--- a/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java
+++ b/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java
@@ -1,7 +1,7 @@
-package com.baeldung.flink;
+package com.baeldung.flink.connector;
-import com.baeldung.schema.InputMessageDeserializationSchema;
-import com.baeldung.model.InputMessage;
+import com.baeldung.flink.model.InputMessage;
+import com.baeldung.flink.schema.InputMessageDeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
diff --git a/libraries/src/main/java/com/baeldung/flink/connector/Producers.java b/libraries/src/main/java/com/baeldung/flink/connector/Producers.java
index 094b6ff211..8e6f3f8f37 100644
--- a/libraries/src/main/java/com/baeldung/flink/connector/Producers.java
+++ b/libraries/src/main/java/com/baeldung/flink/connector/Producers.java
@@ -1,7 +1,7 @@
-package com.baeldung.flink;
+package com.baeldung.flink.connector;
-import com.baeldung.schema.BackupSerializationSchema;
-import com.baeldung.model.Backup;
+import com.baeldung.flink.model.Backup;
+import com.baeldung.flink.schema.BackupSerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
@@ -12,6 +12,6 @@ public class Producers {
}
public static FlinkKafkaProducer011 createBackupProducer(String topic, String kafkaAddress) {
- return new FlinkKafkaProducer011<>(kafkaAddress, topic, new BackupSerializationSchema());
+ return new FlinkKafkaProducer011(kafkaAddress, topic, new BackupSerializationSchema());
}
}
diff --git a/libraries/src/main/java/com/baeldung/flink/model/Backup.java b/libraries/src/main/java/com/baeldung/flink/model/Backup.java
index 3a57d65d78..268ceec7f3 100644
--- a/libraries/src/main/java/com/baeldung/flink/model/Backup.java
+++ b/libraries/src/main/java/com/baeldung/flink/model/Backup.java
@@ -1,4 +1,4 @@
-package com.baeldung.model;
+package com.baeldung.flink.model;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -20,4 +20,8 @@ public class Backup {
this.backupTimestamp = backupTimestamp;
this.uuid = UUID.randomUUID();
}
+
+ public List getInputMessages() {
+ return inputMessages;
+ }
}
diff --git a/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java b/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java
index 57a85de81a..183fa69c11 100644
--- a/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java
+++ b/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java
@@ -1,6 +1,8 @@
-package com.baeldung.model;
+package com.baeldung.flink.model;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.google.common.base.Objects;
+
import java.time.LocalDateTime;
@JsonSerialize
@@ -10,6 +12,9 @@ public class InputMessage {
LocalDateTime sentAt;
String message;
+ public InputMessage() {
+ }
+
public String getSender() {
return sender;
}
@@ -41,4 +46,27 @@ public class InputMessage {
public void setMessage(String message) {
this.message = message;
}
+
+ public InputMessage(String sender, String recipient, LocalDateTime sentAt, String message) {
+ this.sender = sender;
+ this.recipient = recipient;
+ this.sentAt = sentAt;
+ this.message = message;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ InputMessage message1 = (InputMessage) o;
+ return Objects.equal(sender, message1.sender) &&
+ Objects.equal(recipient, message1.recipient) &&
+ Objects.equal(sentAt, message1.sentAt) &&
+ Objects.equal(message, message1.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(sender, recipient, sentAt, message);
+ }
}
diff --git a/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java b/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java
index 2bfbb1e270..c39b8413d1 100644
--- a/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java
+++ b/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java
@@ -1,7 +1,7 @@
-package com.baeldung.flink;
+package com.baeldung.flink.operator;
-import com.baeldung.model.Backup;
-import com.baeldung.model.InputMessage;
+import com.baeldung.flink.model.Backup;
+import com.baeldung.flink.model.InputMessage;
import org.apache.flink.api.common.functions.AggregateFunction;
import java.time.LocalDateTime;
diff --git a/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java b/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java
index 5d58cb36cc..05828d9588 100644
--- a/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java
+++ b/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java
@@ -1,6 +1,6 @@
-package com.baeldung.flink;
+package com.baeldung.flink.operator;
-import com.baeldung.model.InputMessage;
+import com.baeldung.flink.model.InputMessage;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
diff --git a/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java b/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java
index 49fffc292e..f9103d225c 100644
--- a/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java
+++ b/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java
@@ -1,4 +1,4 @@
-package com.baeldung.flink;
+package com.baeldung.flink.operator;
import org.apache.flink.api.common.functions.MapFunction;
diff --git a/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java b/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java
index fa525a5ee8..4db9556d8d 100644
--- a/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java
+++ b/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java
@@ -1,6 +1,6 @@
-package com.baeldung.schema;
+package com.baeldung.flink.schema;
-import com.baeldung.model.Backup;
+import com.baeldung.flink.model.Backup;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.serialization.SerializationSchema;
diff --git a/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java b/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java
index 99b6baa6f4..3c81b67ec1 100644
--- a/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java
+++ b/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java
@@ -1,6 +1,6 @@
-package com.baeldung.schema;
+package com.baeldung.flink.schema;
-import com.baeldung.model.InputMessage;
+import com.baeldung.flink.model.InputMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.common.serialization.DeserializationSchema;
diff --git a/libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java b/libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java
new file mode 100644
index 0000000000..ab7d119c16
--- /dev/null
+++ b/libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java
@@ -0,0 +1,103 @@
+package com.baeldung.flink;
+
+import com.baeldung.flink.model.Backup;
+import com.baeldung.flink.model.InputMessage;
+import com.baeldung.flink.operator.BackupAggregator;
+import com.baeldung.flink.operator.InputMessageTimestampAssigner;
+import com.baeldung.flink.schema.BackupSerializationSchema;
+import com.baeldung.flink.schema.InputMessageDeserializationSchema;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.apache.commons.collections.ListUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class BackupCreatorIntegrationTest {
+ public static ObjectMapper mapper;
+
+ @Before
+ public void setup() {
+ mapper = new ObjectMapper().registerModule(new JavaTimeModule());
+ }
+
+ @Test
+ public void givenProperJson_whenDeserializeIsInvoked_thenProperObjectIsReturned() throws IOException {
+ InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message");
+ byte[] messageSerialized = mapper.writeValueAsBytes(message);
+ DeserializationSchema deserializationSchema = new InputMessageDeserializationSchema();
+ InputMessage messageDeserialized = deserializationSchema.deserialize(messageSerialized);
+
+ assertEquals(message, messageDeserialized);
+ }
+
+ @Test
+ public void givenMultipleInputMessagesFromDifferentDays_whenBackupCreatorIsUser_thenMessagesAreGroupedProperly() throws Exception {
+ LocalDateTime currentTime = LocalDateTime.now();
+ InputMessage message = new InputMessage("Me", "User", currentTime, "First TestMessage");
+ InputMessage secondMessage = new InputMessage("Me", "User", currentTime.plusHours(1), "First TestMessage");
+ InputMessage thirdMessage = new InputMessage("Me", "User", currentTime.plusHours(2), "First TestMessage");
+ InputMessage fourthMessage = new InputMessage("Me", "User", currentTime.plusHours(3), "First TestMessage");
+ InputMessage fifthMessage = new InputMessage("Me", "User", currentTime.plusHours(25), "First TestMessage");
+ InputMessage sixthMessage = new InputMessage("Me", "User", currentTime.plusHours(26), "First TestMessage");
+
+ List firstBackupMessages = Arrays.asList(message, secondMessage, thirdMessage, fourthMessage);
+ List secondBackupMessages = Arrays.asList(fifthMessage, sixthMessage);
+ List inputMessages = ListUtils.union(firstBackupMessages, secondBackupMessages);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+ DataStreamSource testDataSet = env.fromCollection(inputMessages);
+ CollectingSink sink = new CollectingSink();
+ testDataSet.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner())
+ .timeWindowAll(Time.hours(24))
+ .aggregate(new BackupAggregator())
+ .addSink(sink);
+
+ env.execute();
+
+ Awaitility.await().until(() -> sink.backups.size() == 2);
+ assertEquals(2, sink.backups.size());
+ assertEquals(firstBackupMessages, sink.backups.get(0).getInputMessages());
+ assertEquals(secondBackupMessages, sink.backups.get(1).getInputMessages());
+
+ }
+
+ @Test
+ public void givenProperBackupObject_whenSerializeIsInvoked_thenObjectIsProperlySerialized() throws IOException {
+ InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message");
+ List messages = Arrays.asList(message);
+ Backup backup = new Backup(messages, LocalDateTime.now());
+ byte[] backupSerialized = mapper.writeValueAsBytes(backup);
+ SerializationSchema serializationSchema = new BackupSerializationSchema();
+ byte[] backupProcessed = serializationSchema.serialize(backup);
+
+ assertEquals(backupSerialized, backupProcessed);
+ }
+
+ private static class CollectingSink implements SinkFunction {
+
+ public static List backups = new ArrayList<>();
+
+ @Override
+ public synchronized void invoke(Backup value, Context context) throws Exception {
+ backups.add(value);
+ }
+ }
+}
diff --git a/libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java b/libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java
new file mode 100644
index 0000000000..8a98dae4b5
--- /dev/null
+++ b/libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java
@@ -0,0 +1,34 @@
+package com.baeldung.flink;
+
+import com.baeldung.flink.operator.WordsCapitalizer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class WordCapitalizerIntegrationTest {
+
+ @Test
+ public void givenDataSet_whenExecuteWordCapitalizer_thenReturnCapitalizedWords() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ List data = Arrays.asList("dog", "cat", "wolf", "pig");
+
+ DataSet testDataSet = env.fromCollection(data);
+
+
+ List dataProcessed = testDataSet
+ .map(new WordsCapitalizer())
+ .collect();
+
+ List testDataCapitalized = data.stream()
+ .map(String::toUpperCase)
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(testDataCapitalized, dataProcessed);
+ }
+
+}