diff --git a/ksqldb/pom.xml b/ksqldb/pom.xml
new file mode 100644
index 0000000000..13867b16e3
--- /dev/null
+++ b/ksqldb/pom.xml
@@ -0,0 +1,74 @@
+
+
+ 4.0.0
+ ksqldb-app
+ 0.0.1-SNAPSHOT
+ ksqldb
+
+
+ com.baeldung
+ parent-modules
+ 1.0.0-SNAPSHOT
+ ../pom.xml
+
+
+
+
+ confluent
+ confluent-repo
+ http://packages.confluent.io/maven/
+
+
+
+
+
+ io.confluent.ksql
+ ksqldb-api-client
+ ${ksqldb.version}
+
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+
+
+
+ org.awaitility
+ awaitility
+ ${awaitility.version}
+ test
+
+
+
+ org.assertj
+ assertj-core
+ ${assertj.version}
+ test
+
+
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
+ test
+
+
+
+ org.testcontainers
+ junit-jupiter
+ ${testcontainers.version}
+ test
+
+
+
+
+ 6.2.0
+ 3.20.2
+ 4.1.0
+ 1.15.3
+
+
+
diff --git a/ksqldb/src/main/java/com/baeldung/ksqldb/Alert.java b/ksqldb/src/main/java/com/baeldung/ksqldb/Alert.java
new file mode 100644
index 0000000000..badb00f114
--- /dev/null
+++ b/ksqldb/src/main/java/com/baeldung/ksqldb/Alert.java
@@ -0,0 +1,29 @@
+package com.baeldung.ksqldb;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Alert {
+
+ @JsonProperty(value = "SENSOR_ID")
+ private String sensorId;
+
+ @JsonProperty(value = "START_PERIOD")
+ private String startPeriod;
+
+ @JsonProperty(value = "END_PERIOD")
+ private String endPeriod;
+
+ @JsonProperty(value = "AVERAGE_READING")
+ private double averageReading;
+
+}
diff --git a/ksqldb/src/main/java/com/baeldung/ksqldb/KsqlDBApplication.java b/ksqldb/src/main/java/com/baeldung/ksqldb/KsqlDBApplication.java
new file mode 100644
index 0000000000..35ad3ebbb0
--- /dev/null
+++ b/ksqldb/src/main/java/com/baeldung/ksqldb/KsqlDBApplication.java
@@ -0,0 +1,75 @@
+package com.baeldung.ksqldb;
+
+import io.confluent.ksql.api.client.Client;
+import io.confluent.ksql.api.client.ExecuteStatementResult;
+import io.confluent.ksql.api.client.KsqlObject;
+import io.confluent.ksql.api.client.Row;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.reactivestreams.Subscriber;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+@AllArgsConstructor
+@Slf4j
+public class KsqlDBApplication {
+
+ private static final String CREATE_READINGS_STREAM = ""
+ + " CREATE STREAM readings (sensor_id VARCHAR KEY, timestamp VARCHAR, reading INT)"
+ + " WITH (KAFKA_TOPIC = 'readings',"
+ + " VALUE_FORMAT = 'JSON',"
+ + " TIMESTAMP = 'timestamp',"
+ + " TIMESTAMP_FORMAT = 'yyyy-MM-dd HH:mm:ss',"
+ + " PARTITIONS = 1);";
+
+ private static final String CREATE_ALERTS_TABLE = ""
+ + " CREATE TABLE alerts AS"
+ + " SELECT"
+ + " sensor_id,"
+ + " TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS start_period,"
+ + " TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss', 'UTC') AS end_period,"
+ + " AVG(reading) AS average_reading"
+ + " FROM readings"
+ + " WINDOW TUMBLING (SIZE 30 MINUTES)"
+ + " GROUP BY sensor_id"
+ + " HAVING AVG(reading) > 25"
+ + " EMIT CHANGES;";
+
+ private static final String ALERTS_QUERY = "SELECT * FROM alerts EMIT CHANGES;";
+
+ private static final String READINGS_STREAM = "readings";
+
+ private static final Map PROPERTIES = Collections.singletonMap("auto.offset.reset", "earliest");
+
+ private final Client client;
+
+ public CompletableFuture createReadingsStream() {
+ return client.executeStatement(CREATE_READINGS_STREAM, PROPERTIES);
+ }
+
+ public CompletableFuture createAlertsTable() {
+ return client.executeStatement(CREATE_ALERTS_TABLE, PROPERTIES);
+ }
+
+ public CompletableFuture insert(Collection rows) {
+ return CompletableFuture.allOf(
+ rows.stream()
+ .map(row -> client.insertInto(READINGS_STREAM, row))
+ .toArray(CompletableFuture[]::new)
+ );
+ }
+
+ public CompletableFuture subscribeOnAlerts(Subscriber subscriber) {
+ return client.streamQuery(ALERTS_QUERY, PROPERTIES)
+ .thenAccept(streamedQueryResult -> streamedQueryResult.subscribe(subscriber))
+ .whenComplete((result, ex) -> {
+ if (ex != null) {
+ log.error("Alerts push query failed", ex);
+ }
+ });
+ }
+
+}
diff --git a/ksqldb/src/main/java/com/baeldung/ksqldb/Reading.java b/ksqldb/src/main/java/com/baeldung/ksqldb/Reading.java
new file mode 100644
index 0000000000..8964ff5801
--- /dev/null
+++ b/ksqldb/src/main/java/com/baeldung/ksqldb/Reading.java
@@ -0,0 +1,10 @@
+package com.baeldung.ksqldb;
+
+import lombok.Data;
+
+@Data
+public class Reading {
+ private String id;
+ private String timestamp;
+ private int reading;
+}
diff --git a/ksqldb/src/main/java/com/baeldung/ksqldb/RowSubscriber.java b/ksqldb/src/main/java/com/baeldung/ksqldb/RowSubscriber.java
new file mode 100644
index 0000000000..7d09583a39
--- /dev/null
+++ b/ksqldb/src/main/java/com/baeldung/ksqldb/RowSubscriber.java
@@ -0,0 +1,60 @@
+package com.baeldung.ksqldb;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.ksql.api.client.Row;
+import lombok.extern.slf4j.Slf4j;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+public class RowSubscriber implements Subscriber {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private final Class clazz;
+
+ private Subscription subscription;
+
+ public List consumedItems = new ArrayList<>();
+
+ public RowSubscriber(Class clazz) {
+ this.clazz = clazz;
+ }
+
+ @Override
+ public synchronized void onSubscribe(Subscription subscription) {
+ log.info("Subscriber is subscribed.");
+ this.subscription = subscription;
+ subscription.request(1);
+ }
+
+ @Override
+ public synchronized void onNext(Row row) {
+ String jsonString = row.asObject().toJsonString();
+ log.info("Row JSON: {}", jsonString);
+ try {
+ T item = OBJECT_MAPPER.readValue(jsonString, this.clazz);
+ log.info("Item: {}", item);
+ consumedItems.add(item);
+ } catch (JsonProcessingException e) {
+ log.error("Unable to parse json", e);
+ }
+
+ // Request the next row
+ subscription.request(1);
+ }
+
+ @Override
+ public synchronized void onError(Throwable t) {
+ log.error("Received an error", t);
+ }
+
+ @Override
+ public synchronized void onComplete() {
+ log.info("Query has ended.");
+ }
+}
diff --git a/ksqldb/src/test/java/com/baeldung/ksqldb/KsqlDBApplicationLiveTest.java b/ksqldb/src/test/java/com/baeldung/ksqldb/KsqlDBApplicationLiveTest.java
new file mode 100644
index 0000000000..f13f418048
--- /dev/null
+++ b/ksqldb/src/test/java/com/baeldung/ksqldb/KsqlDBApplicationLiveTest.java
@@ -0,0 +1,160 @@
+package com.baeldung.ksqldb;
+
+import io.confluent.ksql.api.client.Client;
+import io.confluent.ksql.api.client.ClientOptions;
+import io.confluent.ksql.api.client.KsqlObject;
+import io.confluent.ksql.api.client.QueryInfo;
+import io.confluent.ksql.api.client.QueryInfo.QueryType;
+import io.confluent.ksql.api.client.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.awaitility.Awaitility.given;
+
+@Testcontainers
+class KsqlDBApplicationLiveTest {
+
+ private static final File KSQLDB_COMPOSE_FILE = new File("src/test/resources/docker/docker-compose.yml");
+
+ private static final Map PROPERTIES = Collections.singletonMap("auto.offset.reset", "earliest");
+
+ private static final String KSQLDB_SERVER_HOST = "localhost";
+ private static final int KSQLDB_SERVER_PORT = 8088;
+
+ @Container
+ public static DockerComposeContainer dockerComposeContainer =
+ new DockerComposeContainer<>(KSQLDB_COMPOSE_FILE)
+ .withServices("zookeeper", "broker", "ksqldb-server")
+ .withExposedService("ksqldb-server", 8088,
+ Wait.forHealthcheck().withStartupTimeout(Duration.ofMinutes(5)))
+ .withLocalCompose(true);
+
+ private KsqlDBApplication ksqlDBApplication;
+
+ private Client client;
+
+ @BeforeEach
+ void setup() {
+ ClientOptions options = ClientOptions.create()
+ .setHost(KSQLDB_SERVER_HOST)
+ .setPort(KSQLDB_SERVER_PORT);
+ client = Client.create(options);
+
+ ksqlDBApplication = new KsqlDBApplication(client);
+ }
+
+ @AfterEach
+ void tearDown() {
+ deleteAlerts();
+ }
+
+ @Test
+ void givenSensorReadings_whenSubscribedToAlerts_thenAlertsAreConsumed() {
+ createAlertsMaterializedView();
+ RowSubscriber alertSubscriber = new RowSubscriber<>(Alert.class);
+
+ CompletableFuture result = ksqlDBApplication.subscribeOnAlerts(alertSubscriber);
+ insertSampleData();
+
+ assertThat(result).isNotNull();
+ await().atMost(Duration.ofMinutes(3)).untilAsserted(() ->
+ assertThat(alertSubscriber.consumedItems)
+ .containsOnly(
+ expectedAlert("sensor-1", "2021-08-01 09:30:00", "2021-08-01 10:00:00", 28.0),
+ expectedAlert("sensor-2", "2021-08-01 10:00:00", "2021-08-01 10:30:00", 26.0)
+ )
+ );
+ }
+
+ @Test
+ void givenSensorReadings_whenPullQueryForRow_thenRowIsReturned() {
+ createAlertsMaterializedView();
+ insertSampleData();
+
+ String pullQuery = "SELECT * FROM alerts WHERE sensor_id = 'sensor-2';";
+
+ given().ignoreExceptions()
+ .await().atMost(Duration.ofMinutes(1))
+ .untilAsserted(() -> {
+ // it may be possible that the materialized view is not updated with sample data yet
+ // so ignore TimeoutException and try again
+ List rows = client.executeQuery(pullQuery, PROPERTIES)
+ .get(10, TimeUnit.SECONDS);
+
+ assertThat(rows).hasSize(1);
+
+ Row row = rows.get(0);
+ assertThat(row.getString("SENSOR_ID")).isEqualTo("sensor-2");
+ assertThat(row.getString("START_PERIOD")).isEqualTo("2021-08-01 10:00:00");
+ assertThat(row.getString("END_PERIOD")).isEqualTo("2021-08-01 10:30:00");
+ assertThat(row.getDouble("AVERAGE_READING")).isEqualTo(26.0);
+ });
+ }
+
+ private void createAlertsMaterializedView() {
+ ksqlDBApplication.createReadingsStream().join();
+ ksqlDBApplication.createAlertsTable().join();
+ }
+
+ private void insertSampleData() {
+ ksqlDBApplication.insert(
+ Arrays.asList(
+ new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:00:00").put("reading", 22),
+ new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:10:00").put("reading", 20),
+ new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:20:00").put("reading", 20),
+
+ // these reading will exceed the alert threshold (sensor-1)
+ new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:30:00").put("reading", 24),
+ new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:40:00").put("reading", 30),
+ new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 09:50:00").put("reading", 30),
+
+ new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 10:00:00").put("reading", 24),
+
+ // these reading will exceed the alert threshold (sensor-2)
+ new KsqlObject().put("sensor_id", "sensor-2").put("timestamp", "2021-08-01 10:00:00").put("reading", 26),
+ new KsqlObject().put("sensor_id", "sensor-2").put("timestamp", "2021-08-01 10:10:00").put("reading", 26),
+ new KsqlObject().put("sensor_id", "sensor-2").put("timestamp", "2021-08-01 10:20:00").put("reading", 26),
+
+ new KsqlObject().put("sensor_id", "sensor-1").put("timestamp", "2021-08-01 10:30:00").put("reading", 24)
+ )
+ ).join();
+ }
+
+ private void deleteAlerts() {
+ client.listQueries()
+ .thenApply(queryInfos -> queryInfos.stream()
+ .filter(queryInfo -> queryInfo.getQueryType() == QueryType.PERSISTENT)
+ .map(QueryInfo::getId)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("Persistent query not found")))
+ .thenCompose(id -> client.executeStatement("TERMINATE " + id + ";"))
+ .thenCompose(result -> client.executeStatement("DROP TABLE alerts DELETE TOPIC;"))
+ .thenCompose(result -> client.executeStatement("DROP STREAM readings DELETE TOPIC;"))
+ .join();
+ }
+
+ private Alert expectedAlert(String sensorId, String startPeriod, String endPeriod, double average) {
+ return Alert.builder()
+ .sensorId(sensorId)
+ .startPeriod(startPeriod)
+ .endPeriod(endPeriod)
+ .averageReading(average)
+ .build();
+ }
+}
diff --git a/ksqldb/src/test/resources/docker/docker-compose.yml b/ksqldb/src/test/resources/docker/docker-compose.yml
new file mode 100644
index 0000000000..c90fe85e45
--- /dev/null
+++ b/ksqldb/src/test/resources/docker/docker-compose.yml
@@ -0,0 +1,49 @@
+---
+version: '3'
+
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:6.2.0
+ hostname: zookeeper
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+
+ broker:
+ image: confluentinc/cp-kafka:6.2.0
+ hostname: broker
+ depends_on:
+ - zookeeper
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
+
+ ksqldb-server:
+ image: confluentinc/ksqldb-server:0.19.0
+ hostname: ksqldb-server
+ depends_on:
+ - broker
+ ports:
+ - "8088:8088"
+ healthcheck:
+ test: curl -f http://ksqldb-server:8088/ || exit 1
+ environment:
+ KSQL_LISTENERS: http://0.0.0.0:8088
+ KSQL_BOOTSTRAP_SERVERS: broker:9092
+ KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
+ KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
+
+ ksqldb-cli:
+ image: confluentinc/ksqldb-cli:0.19.0
+ hostname: ksqldb-cli
+ depends_on:
+ - broker
+ - ksqldb-server
+ entrypoint: /bin/sh
+ tty: true
diff --git a/ksqldb/src/test/resources/log4j.properties b/ksqldb/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..31a98608fb
--- /dev/null
+++ b/ksqldb/src/test/resources/log4j.properties
@@ -0,0 +1,6 @@
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index abdb851734..f56006713a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -472,6 +472,7 @@
jsoup
jta
kubernetes
+ ksqldb
language-interop
libraries-2
@@ -940,6 +941,8 @@
jsoup
jta
+ ksqldb
+
libraries-2
libraries-3