From 0abace19ebff27014c176e0947221ea103c59c33 Mon Sep 17 00:00:00 2001 From: Haroon Khan Date: Sat, 14 Aug 2021 08:38:04 +0100 Subject: [PATCH] [BAEL-5012] Intro to ksqlDB (#11113) * [BAEL-5012] Intro to ksqlDB * [BAEL-5012] Fix POM file and code cleanup * [BAEL-5012] Code cleanup --- ksqldb/pom.xml | 74 ++++++++ .../main/java/com/baeldung/ksqldb/Alert.java | 29 ++++ .../baeldung/ksqldb/KsqlDBApplication.java | 75 ++++++++ .../java/com/baeldung/ksqldb/Reading.java | 10 ++ .../com/baeldung/ksqldb/RowSubscriber.java | 60 +++++++ .../ksqldb/KsqlDBApplicationLiveTest.java | 160 ++++++++++++++++++ .../test/resources/docker/docker-compose.yml | 49 ++++++ ksqldb/src/test/resources/log4j.properties | 6 + pom.xml | 3 + 9 files changed, 466 insertions(+) create mode 100644 ksqldb/pom.xml create mode 100644 ksqldb/src/main/java/com/baeldung/ksqldb/Alert.java create mode 100644 ksqldb/src/main/java/com/baeldung/ksqldb/KsqlDBApplication.java create mode 100644 ksqldb/src/main/java/com/baeldung/ksqldb/Reading.java create mode 100644 ksqldb/src/main/java/com/baeldung/ksqldb/RowSubscriber.java create mode 100644 ksqldb/src/test/java/com/baeldung/ksqldb/KsqlDBApplicationLiveTest.java create mode 100644 ksqldb/src/test/resources/docker/docker-compose.yml create mode 100644 ksqldb/src/test/resources/log4j.properties diff --git a/ksqldb/pom.xml b/ksqldb/pom.xml new file mode 100644 index 0000000000..13867b16e3 --- /dev/null +++ b/ksqldb/pom.xml @@ -0,0 +1,74 @@ + + + 4.0.0 + ksqldb-app + 0.0.1-SNAPSHOT + ksqldb + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + ../pom.xml + + + + + confluent + confluent-repo + http://packages.confluent.io/maven/ + + + + + + io.confluent.ksql + ksqldb-api-client + ${ksqldb.version} + + + + org.projectlombok + lombok + ${lombok.version} + + + + org.awaitility + awaitility + ${awaitility.version} + test + + + + org.assertj + assertj-core + ${assertj.version} + test + + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + + org.testcontainers + junit-jupiter + ${testcontainers.version} + test + + + + + 6.2.0 + 3.20.2 + 4.1.0 + 1.15.3 + + + diff --git a/ksqldb/src/main/java/com/baeldung/ksqldb/Alert.java b/ksqldb/src/main/java/com/baeldung/ksqldb/Alert.java new file mode 100644 index 0000000000..badb00f114 --- /dev/null +++ b/ksqldb/src/main/java/com/baeldung/ksqldb/Alert.java @@ -0,0 +1,29 @@ +package com.baeldung.ksqldb; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class Alert { + + @JsonProperty(value = "SENSOR_ID") + private String sensorId; + + @JsonProperty(value = "START_PERIOD") + private String startPeriod; + + @JsonProperty(value = "END_PERIOD") + private String endPeriod; + + @JsonProperty(value = "AVERAGE_READING") + private double averageReading; + +} diff --git a/ksqldb/src/main/java/com/baeldung/ksqldb/KsqlDBApplication.java b/ksqldb/src/main/java/com/baeldung/ksqldb/KsqlDBApplication.java new file mode 100644 index 0000000000..35ad3ebbb0 --- /dev/null +++ b/ksqldb/src/main/java/com/baeldung/ksqldb/KsqlDBApplication.java @@ -0,0 +1,75 @@ +package com.baeldung.ksqldb; + +import io.confluent.ksql.api.client.Client; +import io.confluent.ksql.api.client.ExecuteStatementResult; +import io.confluent.ksql.api.client.KsqlObject; +import io.confluent.ksql.api.client.Row; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Subscriber; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +@AllArgsConstructor +@Slf4j +public class KsqlDBApplication { + + private static final String CREATE_READINGS_STREAM = "" + + " CREATE STREAM readings (sensor_id VARCHAR KEY, timestamp VARCHAR, reading INT)" + + " WITH (KAFKA_TOPIC = 'readings'," + + " VALUE_FORMAT = 'JSON'," + + " TIMESTAMP = 'timestamp'," + + " TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss'," + + " PARTITIONS = 1);"; + + private static final String CREATE_ALERTS_TABLE = "" + + " CREATE TABLE alerts AS" + + " SELECT" + + " sensor_id," + + " TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS start_period," + + " TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS end_period," + + " AVG(reading) AS average_reading" + + " FROM readings" + + " WINDOW TUMBLING (SIZE 30 MINUTES)" + + " GROUP BY sensor_id" + + " HAVING AVG(reading) > 25" + + " EMIT CHANGES;"; + + private static final String ALERTS_QUERY = "SELECT * FROM alerts EMIT CHANGES;"; + + private static final String READINGS_STREAM = "readings"; + + private static final Map PROPERTIES = Collections.singletonMap("auto.offset.reset", "earliest"); + + private final Client client; + + public CompletableFuture createReadingsStream() { + return client.executeStatement(CREATE_READINGS_STREAM, PROPERTIES); + } + + public CompletableFuture createAlertsTable() { + return client.executeStatement(CREATE_ALERTS_TABLE, PROPERTIES); + } + + public CompletableFuture insert(Collection rows) { + return CompletableFuture.allOf( + rows.stream() + .map(row -> client.insertInto(READINGS_STREAM, row)) + .toArray(CompletableFuture[]::new) + ); + } + + public CompletableFuture subscribeOnAlerts(Subscriber subscriber) { + return client.streamQuery(ALERTS_QUERY, PROPERTIES) + .thenAccept(streamedQueryResult -> streamedQueryResult.subscribe(subscriber)) + .whenComplete((result, ex) -> { + if (ex != null) { + log.error("Alerts push query failed", ex); + } + }); + } + +} diff --git a/ksqldb/src/main/java/com/baeldung/ksqldb/Reading.java b/ksqldb/src/main/java/com/baeldung/ksqldb/Reading.java new file mode 100644 index 0000000000..8964ff5801 --- /dev/null +++ b/ksqldb/src/main/java/com/baeldung/ksqldb/Reading.java @@ -0,0 +1,10 @@ +package com.baeldung.ksqldb; + +import lombok.Data; + +@Data +public class Reading { + private String id; + private String timestamp; + private int reading; +} diff --git a/ksqldb/src/main/java/com/baeldung/ksqldb/RowSubscriber.java b/ksqldb/src/main/java/com/baeldung/ksqldb/RowSubscriber.java new file mode 100644 index 0000000000..7d09583a39 --- /dev/null +++ b/ksqldb/src/main/java/com/baeldung/ksqldb/RowSubscriber.java @@ -0,0 +1,60 @@ +package com.baeldung.ksqldb; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.ksql.api.client.Row; +import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public class RowSubscriber implements Subscriber { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final Class clazz; + + private Subscription subscription; + + public List consumedItems = new ArrayList<>(); + + public RowSubscriber(Class clazz) { + this.clazz = clazz; + } + + @Override + public synchronized void onSubscribe(Subscription subscription) { + log.info("Subscriber is subscribed."); + this.subscription = subscription; + subscription.request(1); + } + + @Override + public synchronized void onNext(Row row) { + String jsonString = row.asObject().toJsonString(); + log.info("Row JSON: {}", jsonString); + try { + T item = OBJECT_MAPPER.readValue(jsonString, this.clazz); + log.info("Item: {}", item); + consumedItems.add(item); + } catch (JsonProcessingException e) { + log.error("Unable to parse json", e); + } + + // Request the next row + subscription.request(1); + } + + @Override + public synchronized void onError(Throwable t) { + log.error("Received an error", t); + } + + @Override + public synchronized void onComplete() { + log.info("Query has ended."); + } +} diff --git a/ksqldb/src/test/java/com/baeldung/ksqldb/KsqlDBApplicationLiveTest.java b/ksqldb/src/test/java/com/baeldung/ksqldb/KsqlDBApplicationLiveTest.java new file mode 100644 index 0000000000..f13f418048 --- /dev/null +++ b/ksqldb/src/test/java/com/baeldung/ksqldb/KsqlDBApplicationLiveTest.java @@ -0,0 +1,160 @@ +package com.baeldung.ksqldb; + +import io.confluent.ksql.api.client.Client; +import io.confluent.ksql.api.client.ClientOptions; +import io.confluent.ksql.api.client.KsqlObject; +import io.confluent.ksql.api.client.QueryInfo; +import io.confluent.ksql.api.client.QueryInfo.QueryType; +import io.confluent.ksql.api.client.Row; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.File; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.awaitility.Awaitility.given; + +@Testcontainers +class KsqlDBApplicationLiveTest { + + private static final File KSQLDB_COMPOSE_FILE = new File("src/test/resources/docker/docker-compose.yml"); + + private static final Map PROPERTIES = Collections.singletonMap("auto.offset.reset", "earliest"); + + private static final String KSQLDB_SERVER_HOST = "localhost"; + private static final int KSQLDB_SERVER_PORT = 8088; + + @Container + public static DockerComposeContainer dockerComposeContainer = + new DockerComposeContainer<>(KSQLDB_COMPOSE_FILE) + .withServices("zookeeper", "broker", "ksqldb-server") + .withExposedService("ksqldb-server", 8088, + Wait.forHealthcheck().withStartupTimeout(Duration.ofMinutes(5))) + .withLocalCompose(true); + + private KsqlDBApplication ksqlDBApplication; + + private Client client; + + @BeforeEach + void setup() { + ClientOptions options = ClientOptions.create() + .setHost(KSQLDB_SERVER_HOST) + .setPort(KSQLDB_SERVER_PORT); + client = Client.create(options); + + ksqlDBApplication = new KsqlDBApplication(client); + } + + @AfterEach + void tearDown() { + deleteAlerts(); + } + + @Test + void givenSensorReadings_whenSubscribedToAlerts_thenAlertsAreConsumed() { + createAlertsMaterializedView(); + RowSubscriber alertSubscriber = new RowSubscriber<>(Alert.class); + + CompletableFuture result = ksqlDBApplication.subscribeOnAlerts(alertSubscriber); + insertSampleData(); + + assertThat(result).isNotNull(); + await().atMost(Duration.ofMinutes(3)).untilAsserted(() -> + assertThat(alertSubscriber.consumedItems) + .containsOnly( + expectedAlert("sensor-1", "2021-08-01 09:30:00", "2021-08-01 10:00:00", 28.0), + expectedAlert("sensor-2", "2021-08-01 10:00:00", "2021-08-01 10:30:00", 26.0) + ) + ); + } + + @Test + void givenSensorReadings_whenPullQueryForRow_thenRowIsReturned() { + createAlertsMaterializedView(); + insertSampleData(); + + String pullQuery = "SELECT * FROM alerts WHERE sensor_id = 'sensor-2';"; + + given().ignoreExceptions() + .await().atMost(Duration.ofMinutes(1)) + .untilAsserted(() -> { + // it may be possible that the materialized view is not updated with sample data yet + // so ignore TimeoutException and try again + List rows = client.executeQuery(pullQuery, PROPERTIES) + .get(10, TimeUnit.SECONDS); + + assertThat(rows).hasSize(1); + + Row row = rows.get(0); + assertThat(row.getString("SENSOR_ID")).isEqualTo("sensor-2"); + assertThat(row.getString("START_PERIOD")).isEqualTo("2021-08-01 10:00:00"); + assertThat(row.getString("END_PERIOD")).isEqualTo("2021-08-01 10:30:00"); + assertThat(row.getDouble("AVERAGE_READING")).isEqualTo(26.0); + }); + } + + private void createAlertsMaterializedView() { + ksqlDBApplication.createReadingsStream().join(); + ksqlDBApplication.createAlertsTable().join(); + } + + private void insertSampleData() { + ksqlDBApplication.insert( + Arrays.asList( + new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:00:00").put("reading", 22), + new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:10:00").put("reading", 20), + new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:20:00").put("reading", 20), + + // these reading will exceed the alert threshold (sensor-1) + new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:30:00").put("reading", 24), + new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:40:00").put("reading", 30), + new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:50:00").put("reading", 30), + + new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 10:00:00").put("reading", 24), + + // these reading will exceed the alert threshold (sensor-2) + new KsqlObject().put("sensor_id", "sensor-2").put("timestamp", "2021-08-01 10:00:00").put("reading", 26), + new KsqlObject().put("sensor_id", "sensor-2").put("timestamp", "2021-08-01 10:10:00").put("reading", 26), + new KsqlObject().put("sensor_id", "sensor-2").put("timestamp", "2021-08-01 10:20:00").put("reading", 26), + + new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 10:30:00").put("reading", 24) + ) + ).join(); + } + + private void deleteAlerts() { + client.listQueries() + .thenApply(queryInfos -> queryInfos.stream() + .filter(queryInfo -> queryInfo.getQueryType() == QueryType.PERSISTENT) + .map(QueryInfo::getId) + .findFirst() + .orElseThrow(() -> new RuntimeException("Persistent query not found"))) + .thenCompose(id -> client.executeStatement("TERMINATE " + id + ";")) + .thenCompose(result -> client.executeStatement("DROP TABLE alerts DELETE TOPIC;")) + .thenCompose(result -> client.executeStatement("DROP STREAM readings DELETE TOPIC;")) + .join(); + } + + private Alert expectedAlert(String sensorId, String startPeriod, String endPeriod, double average) { + return Alert.builder() + .sensorId(sensorId) + .startPeriod(startPeriod) + .endPeriod(endPeriod) + .averageReading(average) + .build(); + } +} diff --git a/ksqldb/src/test/resources/docker/docker-compose.yml b/ksqldb/src/test/resources/docker/docker-compose.yml new file mode 100644 index 0000000000..c90fe85e45 --- /dev/null +++ b/ksqldb/src/test/resources/docker/docker-compose.yml @@ -0,0 +1,49 @@ +--- +version: '3' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:6.2.0 + hostname: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + broker: + image: confluentinc/cp-kafka:6.2.0 + hostname: broker + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + + ksqldb-server: + image: confluentinc/ksqldb-server:0.19.0 + hostname: ksqldb-server + depends_on: + - broker + ports: + - "8088:8088" + healthcheck: + test: curl -f http://ksqldb-server:8088/ || exit 1 + environment: + KSQL_LISTENERS: http://0.0.0.0:8088 + KSQL_BOOTSTRAP_SERVERS: broker:9092 + KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" + KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" + + ksqldb-cli: + image: confluentinc/ksqldb-cli:0.19.0 + hostname: ksqldb-cli + depends_on: + - broker + - ksqldb-server + entrypoint: /bin/sh + tty: true diff --git a/ksqldb/src/test/resources/log4j.properties b/ksqldb/src/test/resources/log4j.properties new file mode 100644 index 0000000000..31a98608fb --- /dev/null +++ b/ksqldb/src/test/resources/log4j.properties @@ -0,0 +1,6 @@ +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n \ No newline at end of file diff --git a/pom.xml b/pom.xml index abdb851734..f56006713a 100644 --- a/pom.xml +++ b/pom.xml @@ -472,6 +472,7 @@ jsoup jta kubernetes + ksqldb language-interop libraries-2 @@ -940,6 +941,8 @@ jsoup jta + ksqldb + libraries-2 libraries-3