Moved Flink related code from libraries to libraries-data module. (#5253)
This commit is contained in:
committed by
Grzegorz Piwowarek
parent
33aa52ac94
commit
05e1700fb7
@@ -147,7 +147,6 @@
|
||||
<artifactId>jmapper-core</artifactId>
|
||||
<version>${jmapper.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- crunch project -->
|
||||
<dependency>
|
||||
<groupId>org.apache.crunch</groupId>
|
||||
@@ -185,7 +184,72 @@
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-streaming-java_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||
<artifactId>jackson-datatype-jsr310</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-core</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-java</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>commons-logging</artifactId>
|
||||
<groupId>commons-logging</groupId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.flink</groupId>
|
||||
<artifactId>flink-test-utils_2.11</artifactId>
|
||||
<version>${flink.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<version>${awaitility.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility-proxy</artifactId>
|
||||
<version>${awaitility.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
@@ -336,6 +400,10 @@
|
||||
<ignite.version>2.4.0</ignite.version>
|
||||
<gson.version>2.8.2</gson.version>
|
||||
<cache.version>1.1.0</cache.version>
|
||||
<flink.version>1.5.0</flink.version>
|
||||
<jackson.version>2.8.5</jackson.version>
|
||||
<awaitility.version>3.0.0</awaitility.version>
|
||||
<assertj.version>3.6.2</assertj.version>
|
||||
<hazelcast.version>3.8.4</hazelcast.version>
|
||||
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
|
||||
<build-helper-maven-plugin.version>3.0.0</build-helper-maven-plugin.version>
|
||||
|
||||
@@ -0,0 +1,82 @@
|
||||
package com.baeldung.flink;
|
||||
|
||||
|
||||
import com.baeldung.flink.model.Backup;
|
||||
import com.baeldung.flink.model.InputMessage;
|
||||
import com.baeldung.flink.operator.BackupAggregator;
|
||||
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
|
||||
import com.baeldung.flink.operator.WordsCapitalizer;
|
||||
import org.apache.flink.streaming.api.TimeCharacteristic;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
|
||||
|
||||
import static com.baeldung.flink.connector.Consumers.*;
|
||||
import static com.baeldung.flink.connector.Producers.*;
|
||||
|
||||
public class FlinkDataPipeline {
|
||||
|
||||
public static void capitalize() throws Exception {
|
||||
String inputTopic = "flink_input";
|
||||
String outputTopic = "flink_output";
|
||||
String consumerGroup = "baeldung";
|
||||
String address = "localhost:9092";
|
||||
|
||||
StreamExecutionEnvironment environment =
|
||||
StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
FlinkKafkaConsumer011<String> flinkKafkaConsumer =
|
||||
createStringConsumerForTopic(inputTopic, address, consumerGroup);
|
||||
flinkKafkaConsumer.setStartFromEarliest();
|
||||
|
||||
DataStream<String> stringInputStream =
|
||||
environment.addSource(flinkKafkaConsumer);
|
||||
|
||||
FlinkKafkaProducer011<String> flinkKafkaProducer =
|
||||
createStringProducer(outputTopic, address);
|
||||
|
||||
stringInputStream
|
||||
.map(new WordsCapitalizer())
|
||||
.addSink(flinkKafkaProducer);
|
||||
|
||||
environment.execute();
|
||||
}
|
||||
|
||||
public static void createBackup () throws Exception {
|
||||
String inputTopic = "flink_input";
|
||||
String outputTopic = "flink_output";
|
||||
String consumerGroup = "baeldung";
|
||||
String kafkaAddress = "localhost:9092";
|
||||
|
||||
StreamExecutionEnvironment environment =
|
||||
StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
|
||||
|
||||
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer =
|
||||
createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
|
||||
flinkKafkaConsumer.setStartFromEarliest();
|
||||
|
||||
flinkKafkaConsumer
|
||||
.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
|
||||
FlinkKafkaProducer011<Backup> flinkKafkaProducer =
|
||||
createBackupProducer(outputTopic, kafkaAddress);
|
||||
|
||||
DataStream<InputMessage> inputMessagesStream =
|
||||
environment.addSource(flinkKafkaConsumer);
|
||||
|
||||
inputMessagesStream
|
||||
.timeWindowAll(Time.hours(24))
|
||||
.aggregate(new BackupAggregator())
|
||||
.addSink(flinkKafkaProducer);
|
||||
|
||||
environment.execute();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
createBackup();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package com.baeldung.flink;
|
||||
|
||||
import org.apache.flink.api.common.functions.FlatMapFunction;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.util.Collector;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
|
||||
|
||||
@Override
|
||||
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
|
||||
|
||||
String[] tokens = value.toLowerCase().split("\\W+");
|
||||
Stream.of(tokens).filter(t -> t.length() > 0).forEach(token -> out.collect(new Tuple2<>(token, 1)));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package com.baeldung.flink;
|
||||
|
||||
import org.apache.flink.api.java.DataSet;
|
||||
import org.apache.flink.api.java.ExecutionEnvironment;
|
||||
import org.apache.flink.api.java.aggregation.Aggregations;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class WordCount {
|
||||
|
||||
public static DataSet<Tuple2<String, Integer>> startWordCount(ExecutionEnvironment env, List<String> lines) throws Exception {
|
||||
DataSet<String> text = env.fromCollection(lines);
|
||||
|
||||
return text.flatMap(new LineSplitter()).groupBy(0).aggregate(Aggregations.SUM, 1);
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.baeldung.flink.connector;
|
||||
|
||||
import com.baeldung.flink.model.InputMessage;
|
||||
import com.baeldung.flink.schema.InputMessageDeserializationSchema;
|
||||
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
public class Consumers {
|
||||
|
||||
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
|
||||
String topic, String kafkaAddress, String kafkaGroup ) {
|
||||
Properties props = new Properties();
|
||||
props.setProperty("bootstrap.servers", kafkaAddress);
|
||||
props.setProperty("group.id",kafkaGroup);
|
||||
FlinkKafkaConsumer011<String> consumer =
|
||||
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
|
||||
|
||||
return consumer;
|
||||
}
|
||||
|
||||
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("bootstrap.servers", kafkaAddress);
|
||||
properties.setProperty("group.id",kafkaGroup);
|
||||
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(
|
||||
topic, new InputMessageDeserializationSchema(),properties);
|
||||
|
||||
return consumer;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package com.baeldung.flink.connector;
|
||||
|
||||
import com.baeldung.flink.model.Backup;
|
||||
import com.baeldung.flink.schema.BackupSerializationSchema;
|
||||
import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
||||
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
|
||||
|
||||
public class Producers {
|
||||
|
||||
public static FlinkKafkaProducer011<String> createStringProducer(String topic, String kafkaAddress) {
|
||||
return new FlinkKafkaProducer011<>(kafkaAddress, topic, new SimpleStringSchema());
|
||||
}
|
||||
|
||||
public static FlinkKafkaProducer011<Backup> createBackupProducer(String topic, String kafkaAddress) {
|
||||
return new FlinkKafkaProducer011<Backup>(kafkaAddress, topic, new BackupSerializationSchema());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package com.baeldung.flink.model;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public class Backup {
|
||||
|
||||
@JsonProperty("inputMessages")
|
||||
List<InputMessage> inputMessages;
|
||||
@JsonProperty("backupTimestamp")
|
||||
LocalDateTime backupTimestamp;
|
||||
@JsonProperty("uuid")
|
||||
UUID uuid;
|
||||
|
||||
public Backup(List<InputMessage> inputMessages, LocalDateTime backupTimestamp) {
|
||||
this.inputMessages = inputMessages;
|
||||
this.backupTimestamp = backupTimestamp;
|
||||
this.uuid = UUID.randomUUID();
|
||||
}
|
||||
|
||||
public List<InputMessage> getInputMessages() {
|
||||
return inputMessages;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
package com.baeldung.flink.model;
|
||||
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||
import com.google.common.base.Objects;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@JsonSerialize
|
||||
public class InputMessage {
|
||||
String sender;
|
||||
String recipient;
|
||||
LocalDateTime sentAt;
|
||||
String message;
|
||||
|
||||
public InputMessage() {
|
||||
}
|
||||
|
||||
public String getSender() {
|
||||
return sender;
|
||||
}
|
||||
public void setSender(String sender) {
|
||||
this.sender = sender;
|
||||
}
|
||||
|
||||
public String getRecipient() {
|
||||
return recipient;
|
||||
}
|
||||
|
||||
public void setRecipient(String recipient) {
|
||||
this.recipient = recipient;
|
||||
}
|
||||
|
||||
public LocalDateTime getSentAt() {
|
||||
return sentAt;
|
||||
}
|
||||
|
||||
public void setSentAt(LocalDateTime sentAt) {
|
||||
this.sentAt = sentAt;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public void setMessage(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public InputMessage(String sender, String recipient, LocalDateTime sentAt, String message) {
|
||||
this.sender = sender;
|
||||
this.recipient = recipient;
|
||||
this.sentAt = sentAt;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
InputMessage message1 = (InputMessage) o;
|
||||
return Objects.equal(sender, message1.sender) &&
|
||||
Objects.equal(recipient, message1.recipient) &&
|
||||
Objects.equal(sentAt, message1.sentAt) &&
|
||||
Objects.equal(message, message1.message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(sender, recipient, sentAt, message);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.baeldung.flink.operator;
|
||||
|
||||
import com.baeldung.flink.model.Backup;
|
||||
import com.baeldung.flink.model.InputMessage;
|
||||
import org.apache.flink.api.common.functions.AggregateFunction;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
|
||||
@Override
|
||||
public List<InputMessage> createAccumulator() {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
|
||||
inputMessages.add(inputMessage);
|
||||
return inputMessages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Backup getResult(List<InputMessage> inputMessages) {
|
||||
Backup backup = new Backup(inputMessages, LocalDateTime.now());
|
||||
return backup;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
|
||||
inputMessages.addAll(acc1);
|
||||
return inputMessages;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.baeldung.flink.operator;
|
||||
|
||||
import com.baeldung.flink.model.InputMessage;
|
||||
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
|
||||
import org.apache.flink.streaming.api.watermark.Watermark;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.time.ZoneId;
|
||||
|
||||
public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWatermarks<InputMessage> {
|
||||
|
||||
@Override
|
||||
public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
|
||||
ZoneId zoneId = ZoneId.systemDefault();
|
||||
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Watermark checkAndGetNextWatermark(InputMessage lastElement, long extractedTimestamp) {
|
||||
return new Watermark(extractedTimestamp - 15);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.baeldung.flink.operator;
|
||||
|
||||
import org.apache.flink.api.common.functions.MapFunction;
|
||||
|
||||
public class WordsCapitalizer implements MapFunction<String, String> {
|
||||
|
||||
@Override
|
||||
public String map(String s) {
|
||||
return s.toUpperCase();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.baeldung.flink.schema;
|
||||
|
||||
import com.baeldung.flink.model.Backup;
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import com.fasterxml.jackson.annotation.PropertyAccessor;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import org.apache.flink.api.common.serialization.SerializationSchema;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class BackupSerializationSchema
|
||||
implements SerializationSchema<Backup> {
|
||||
|
||||
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||
|
||||
Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);
|
||||
|
||||
@Override
|
||||
public byte[] serialize(Backup backupMessage) {
|
||||
if(objectMapper == null) {
|
||||
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
|
||||
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||
}
|
||||
try {
|
||||
String json = objectMapper.writeValueAsString(backupMessage);
|
||||
return json.getBytes();
|
||||
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
|
||||
logger.error("Failed to parse JSON", e);
|
||||
}
|
||||
return new byte[0];
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.baeldung.flink.schema;
|
||||
|
||||
import com.baeldung.flink.model.InputMessage;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import org.apache.flink.api.common.serialization.DeserializationSchema;
|
||||
import org.apache.flink.api.common.typeinfo.TypeInformation;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class InputMessageDeserializationSchema implements
|
||||
DeserializationSchema<InputMessage> {
|
||||
|
||||
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||
|
||||
|
||||
@Override
|
||||
public InputMessage deserialize(byte[] bytes) throws IOException {
|
||||
|
||||
return objectMapper.readValue(bytes, InputMessage.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEndOfStream(InputMessage inputMessage) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TypeInformation<InputMessage> getProducedType() {
|
||||
return TypeInformation.of(InputMessage.class);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,103 @@
|
||||
package com.baeldung.flink;
|
||||
|
||||
import com.baeldung.flink.model.Backup;
|
||||
import com.baeldung.flink.model.InputMessage;
|
||||
import com.baeldung.flink.operator.BackupAggregator;
|
||||
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
|
||||
import com.baeldung.flink.schema.BackupSerializationSchema;
|
||||
import com.baeldung.flink.schema.InputMessageDeserializationSchema;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
|
||||
import org.apache.commons.collections.ListUtils;
|
||||
import org.apache.flink.api.common.serialization.DeserializationSchema;
|
||||
import org.apache.flink.api.common.serialization.SerializationSchema;
|
||||
import org.apache.flink.streaming.api.TimeCharacteristic;
|
||||
import org.apache.flink.streaming.api.datastream.DataStreamSource;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
import org.awaitility.Awaitility;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class BackupCreatorIntegrationTest {
|
||||
public static ObjectMapper mapper;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
mapper = new ObjectMapper().registerModule(new JavaTimeModule());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenProperJson_whenDeserializeIsInvoked_thenProperObjectIsReturned() throws IOException {
|
||||
InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message");
|
||||
byte[] messageSerialized = mapper.writeValueAsBytes(message);
|
||||
DeserializationSchema<InputMessage> deserializationSchema = new InputMessageDeserializationSchema();
|
||||
InputMessage messageDeserialized = deserializationSchema.deserialize(messageSerialized);
|
||||
|
||||
assertEquals(message, messageDeserialized);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenMultipleInputMessagesFromDifferentDays_whenBackupCreatorIsUser_thenMessagesAreGroupedProperly() throws Exception {
|
||||
LocalDateTime currentTime = LocalDateTime.now();
|
||||
InputMessage message = new InputMessage("Me", "User", currentTime, "First TestMessage");
|
||||
InputMessage secondMessage = new InputMessage("Me", "User", currentTime.plusHours(1), "First TestMessage");
|
||||
InputMessage thirdMessage = new InputMessage("Me", "User", currentTime.plusHours(2), "First TestMessage");
|
||||
InputMessage fourthMessage = new InputMessage("Me", "User", currentTime.plusHours(3), "First TestMessage");
|
||||
InputMessage fifthMessage = new InputMessage("Me", "User", currentTime.plusHours(25), "First TestMessage");
|
||||
InputMessage sixthMessage = new InputMessage("Me", "User", currentTime.plusHours(26), "First TestMessage");
|
||||
|
||||
List<InputMessage> firstBackupMessages = Arrays.asList(message, secondMessage, thirdMessage, fourthMessage);
|
||||
List<InputMessage> secondBackupMessages = Arrays.asList(fifthMessage, sixthMessage);
|
||||
List<InputMessage> inputMessages = ListUtils.union(firstBackupMessages, secondBackupMessages);
|
||||
|
||||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
|
||||
env.setParallelism(1);
|
||||
DataStreamSource<InputMessage> testDataSet = env.fromCollection(inputMessages);
|
||||
CollectingSink sink = new CollectingSink();
|
||||
testDataSet.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner())
|
||||
.timeWindowAll(Time.hours(24))
|
||||
.aggregate(new BackupAggregator())
|
||||
.addSink(sink);
|
||||
|
||||
env.execute();
|
||||
|
||||
Awaitility.await().until(() -> sink.backups.size() == 2);
|
||||
assertEquals(2, sink.backups.size());
|
||||
assertEquals(firstBackupMessages, sink.backups.get(0).getInputMessages());
|
||||
assertEquals(secondBackupMessages, sink.backups.get(1).getInputMessages());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenProperBackupObject_whenSerializeIsInvoked_thenObjectIsProperlySerialized() throws IOException {
|
||||
InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message");
|
||||
List<InputMessage> messages = Arrays.asList(message);
|
||||
Backup backup = new Backup(messages, LocalDateTime.now());
|
||||
byte[] backupSerialized = mapper.writeValueAsBytes(backup);
|
||||
SerializationSchema<Backup> serializationSchema = new BackupSerializationSchema();
|
||||
byte[] backupProcessed = serializationSchema.serialize(backup);
|
||||
|
||||
assertEquals(backupSerialized, backupProcessed);
|
||||
}
|
||||
|
||||
private static class CollectingSink implements SinkFunction<Backup> {
|
||||
|
||||
public static List<Backup> backups = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public synchronized void invoke(Backup value, Context context) throws Exception {
|
||||
backups.add(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.baeldung.flink;
|
||||
|
||||
import com.baeldung.flink.operator.WordsCapitalizer;
|
||||
import org.apache.flink.api.java.DataSet;
|
||||
import org.apache.flink.api.java.ExecutionEnvironment;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class WordCapitalizerIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void givenDataSet_whenExecuteWordCapitalizer_thenReturnCapitalizedWords() throws Exception {
|
||||
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
|
||||
List<String> data = Arrays.asList("dog", "cat", "wolf", "pig");
|
||||
|
||||
DataSet<String> testDataSet = env.fromCollection(data);
|
||||
|
||||
|
||||
List<String> dataProcessed = testDataSet
|
||||
.map(new WordsCapitalizer())
|
||||
.collect();
|
||||
|
||||
List<String> testDataCapitalized = data.stream()
|
||||
.map(String::toUpperCase)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
Assert.assertEquals(testDataCapitalized, dataProcessed);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,161 @@
|
||||
package com.baeldung.flink;
|
||||
|
||||
import org.apache.flink.api.common.operators.Order;
|
||||
import org.apache.flink.api.java.DataSet;
|
||||
import org.apache.flink.api.java.ExecutionEnvironment;
|
||||
import org.apache.flink.api.java.functions.KeySelector;
|
||||
import org.apache.flink.api.java.tuple.Tuple2;
|
||||
import org.apache.flink.api.java.tuple.Tuple3;
|
||||
import org.apache.flink.streaming.api.datastream.DataStream;
|
||||
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
||||
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
|
||||
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
|
||||
import org.apache.flink.streaming.api.windowing.time.Time;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class WordCountIntegrationTest {
|
||||
private final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
@Test
|
||||
public void givenDataSet_whenExecuteWordCount_thenReturnWordCount() throws Exception {
|
||||
// given
|
||||
List<String> lines = Arrays.asList("This is a first sentence", "This is a second sentence with a one word");
|
||||
|
||||
// when
|
||||
DataSet<Tuple2<String, Integer>> result = WordCount.startWordCount(env, lines);
|
||||
|
||||
// then
|
||||
List<Tuple2<String, Integer>> collect = result.collect();
|
||||
assertThat(collect).containsExactlyInAnyOrder(new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1), new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1), new Tuple2<>("first", 1), new Tuple2<>("with", 1),
|
||||
new Tuple2<>("one", 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenListOfAmounts_whenUseMapReduce_thenSumAmountsThatAreOnlyAboveThreshold() throws Exception {
|
||||
// given
|
||||
DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);
|
||||
int threshold = 30;
|
||||
|
||||
// when
|
||||
List<Integer> collect = amounts.filter(a -> a > threshold).reduce((integer, t1) -> integer + t1).collect();
|
||||
|
||||
// then
|
||||
assertThat(collect.get(0)).isEqualTo(90);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenDataSetOfComplexObjects_whenMapToGetOneField_thenReturnedListHaveProperElements() throws Exception {
|
||||
// given
|
||||
DataSet<Person> personDataSource = env.fromCollection(Arrays.asList(new Person(23, "Tom"), new Person(75, "Michael")));
|
||||
|
||||
// when
|
||||
List<Integer> ages = personDataSource.map(p -> p.age).collect();
|
||||
|
||||
// then
|
||||
assertThat(ages).hasSize(2);
|
||||
assertThat(ages).contains(23, 75);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenDataSet_whenSortItByOneField_thenShouldReturnSortedDataSet() throws Exception {
|
||||
// given
|
||||
Tuple2<Integer, String> secondPerson = new Tuple2<>(4, "Tom");
|
||||
Tuple2<Integer, String> thirdPerson = new Tuple2<>(5, "Scott");
|
||||
Tuple2<Integer, String> fourthPerson = new Tuple2<>(200, "Michael");
|
||||
Tuple2<Integer, String> firstPerson = new Tuple2<>(1, "Jack");
|
||||
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(fourthPerson, secondPerson, thirdPerson, firstPerson);
|
||||
|
||||
// when
|
||||
List<Tuple2<Integer, String>> sorted = transactions.sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING).collect();
|
||||
|
||||
// then
|
||||
assertThat(sorted).containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void giveTwoDataSets_whenJoinUsingId_thenProduceJoinedData() throws Exception {
|
||||
// given
|
||||
Tuple3<Integer, String, String> address = new Tuple3<>(1, "5th Avenue", "London");
|
||||
DataSet<Tuple3<Integer, String, String>> addresses = env.fromElements(address);
|
||||
|
||||
Tuple2<Integer, String> firstTransaction = new Tuple2<>(1, "Transaction_1");
|
||||
DataSet<Tuple2<Integer, String>> transactions = env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));
|
||||
|
||||
// when
|
||||
List<Tuple2<Tuple2<Integer, String>, Tuple3<Integer, String, String>>> joined = transactions.join(addresses).where(new IdKeySelectorTransaction()).equalTo(new IdKeySelectorAddress()).collect();
|
||||
|
||||
// then
|
||||
assertThat(joined).hasSize(1);
|
||||
assertThat(joined).contains(new Tuple2<>(firstTransaction, address));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenStreamOfEvents_whenProcessEvents_thenShouldPrintResultsOnSinkOperation() throws Exception {
|
||||
// given
|
||||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
DataStream<String> text = env.fromElements("This is a first sentence", "This is a second sentence with a one word");
|
||||
|
||||
SingleOutputStreamOperator<String> upperCase = text.map(String::toUpperCase);
|
||||
|
||||
upperCase.print();
|
||||
|
||||
// when
|
||||
env.execute();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenStreamOfEvents_whenProcessEvents_thenShouldApplyWindowingOnTransformation() throws Exception {
|
||||
// given
|
||||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
||||
|
||||
SingleOutputStreamOperator<Tuple2<Integer, Long>> windowed = env.fromElements(new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()), new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
|
||||
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Integer, Long>>(Time.seconds(20)) {
|
||||
@Override
|
||||
public long extractTimestamp(Tuple2<Integer, Long> element) {
|
||||
return element.f1 * 1000;
|
||||
}
|
||||
});
|
||||
|
||||
SingleOutputStreamOperator<Tuple2<Integer, Long>> reduced = windowed.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).maxBy(0, true);
|
||||
|
||||
reduced.print();
|
||||
|
||||
// when
|
||||
env.execute();
|
||||
}
|
||||
|
||||
private static class IdKeySelectorTransaction implements KeySelector<Tuple2<Integer, String>, Integer> {
|
||||
@Override
|
||||
public Integer getKey(Tuple2<Integer, String> value) {
|
||||
return value.f0;
|
||||
}
|
||||
}
|
||||
|
||||
private static class IdKeySelectorAddress implements KeySelector<Tuple3<Integer, String, String>, Integer> {
|
||||
@Override
|
||||
public Integer getKey(Tuple3<Integer, String, String> value) {
|
||||
return value.f0;
|
||||
}
|
||||
}
|
||||
|
||||
private static class Person {
|
||||
private final int age;
|
||||
private final String name;
|
||||
|
||||
private Person(int age, String name) {
|
||||
this.age = age;
|
||||
this.name = name;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user