diff --git a/influxdb/README.md b/influxdb/README.md new file mode 100644 index 0000000000..f2c421580e --- /dev/null +++ b/influxdb/README.md @@ -0,0 +1,17 @@ +## Influx SDK Tutorial Project + +### Relevant Article: +- [Introduction to using InfluxDB with Java](http://www.baeldung.com/using-influxdb-with-java/) + +### Overview +This Maven project contains the Java code for the article linked above. + +### Package Organization +Java classes for the intro tutorial are in the +org.baeldung.influxdb package. + + +### Running the tests +The test class expects an InfluxDB server to be available on localhost, at the default port of 8086 and with the default "admin" credentials. + +``` diff --git a/influxdb/pom.xml b/influxdb/pom.xml new file mode 100644 index 0000000000..05a8e00fa0 --- /dev/null +++ b/influxdb/pom.xml @@ -0,0 +1,44 @@ + + + 4.0.0 + influxdb + 0.1-SNAPSHOT + jar + influxdb + InfluxDB SDK Tutorial + + + com.baeldung + parent-modules + 1.0.0-SNAPSHOT + + + + + org.influxdb + influxdb-java + ${influxdb.sdk.version} + + + + org.projectlombok + lombok + + ${lombok.version} + provided + + + + + + + + 1.8 + UTF-8 + 2.8 + 1.16.18 + + + + diff --git a/influxdb/src/main/java/com/baeldung/influxdb/MemoryPoint.java b/influxdb/src/main/java/com/baeldung/influxdb/MemoryPoint.java new file mode 100644 index 0000000000..fb05a70867 --- /dev/null +++ b/influxdb/src/main/java/com/baeldung/influxdb/MemoryPoint.java @@ -0,0 +1,28 @@ +package com.baeldung.influxdb; + +import lombok.Data; +import org.influxdb.annotation.Column; +import org.influxdb.annotation.Measurement; + +import java.time.Instant; + +@Data +@Measurement(name = "memory") +public class MemoryPoint { + + @Column(name = "time") + private Instant time; + + @Column(name = "name") + private String name; + + @Column(name = "free") + private Long free; + + @Column(name = "used") + private Long used; + + @Column(name = "buffer") + private Long buffer; + +} diff --git a/influxdb/src/test/java/com/baeldung/influxdb/InfluxDBConnectionLiveTest.java b/influxdb/src/test/java/com/baeldung/influxdb/InfluxDBConnectionLiveTest.java new file mode 100644 index 0000000000..50d35b9b1c --- /dev/null +++ b/influxdb/src/test/java/com/baeldung/influxdb/InfluxDBConnectionLiveTest.java @@ -0,0 +1,179 @@ +package com.baeldung.influxdb; + +import lombok.extern.slf4j.Slf4j; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.InfluxDBIOException; +import org.influxdb.dto.*; +import org.influxdb.impl.InfluxDBResultMapper; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static junit.framework.TestCase.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Slf4j +public class InfluxDBConnectionLiveTest { + + @Test + public void whenCorrectInfoDatabaseConnects() { + + InfluxDB connection = connectDatabase(); + assertTrue(pingServer(connection)); + } + + private InfluxDB connectDatabase() { + + // Connect to database assumed on localhost with default credentials. + return InfluxDBFactory.connect("http://127.0.0.1:8086", "admin", "admin"); + + } + + private boolean pingServer(InfluxDB influxDB) { + try { + // Ping and check for version string + Pong response = influxDB.ping(); + if (response.getVersion().equalsIgnoreCase("unknown")) { + log.error("Error pinging server."); + return false; + } else { + log.info("Database version: {}", response.getVersion()); + return true; + } + } catch (InfluxDBIOException idbo) { + log.error("Exception while pinging database: ", idbo); + return false; + } + } + + @Test + public void whenDatabaseCreatedDatabaseChecksOk() { + + InfluxDB connection = connectDatabase(); + + // Create "baeldung and check for it + connection.createDatabase("baeldung"); + assertTrue(connection.databaseExists("baeldung")); + + // Verify that nonsense databases are not there + assertFalse(connection.databaseExists("foobar")); + + // Drop "baeldung" and check again + connection.deleteDatabase("baeldung"); + assertFalse(connection.databaseExists("baeldung")); + } + + @Test + public void whenPointsWrittenPointsExists() throws Exception { + + InfluxDB connection = connectDatabase(); + + String dbName = "baeldung"; + connection.createDatabase(dbName); + + // Need a retention policy before we can proceed + connection.createRetentionPolicy("defaultPolicy", "baeldung", "30d", 1, true); + + // Since we are doing a batch thread, we need to set this as a default + connection.setRetentionPolicy("defaultPolicy"); + + // Enable batch mode + connection.enableBatch(10, 10, TimeUnit.MILLISECONDS); + + for (int i = 0; i < 10; i++) { + Point point = Point.measurement("memory") + .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) + .addField("name", "server1") + .addField("free", 4743656L) + .addField("used", 1015096L) + .addField("buffer", 1010467L) + .build(); + + connection.write(dbName, "defaultPolicy", point); + Thread.sleep(2); + + } + + // Unfortunately, the sleep inside the loop doesn't always add enough time to insure + // that Influx's batch thread flushes all of the writes and this sometimes fails without + // another brief pause. + Thread.sleep(10); + + List memoryPointList = getPoints(connection, "Select * from memory", "baeldung"); + + assertEquals(10, memoryPointList.size()); + + // Turn off batch and clean up + connection.disableBatch(); + connection.deleteDatabase("baeldung"); + connection.close(); + + } + + private List getPoints(InfluxDB connection, String query, String databaseName) { + + // Run the query + Query queryObject = new Query(query, databaseName); + QueryResult queryResult = connection.query(queryObject); + + // Map it + InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); + return resultMapper.toPOJO(queryResult, MemoryPoint.class); + } + + + + @Test + public void whenBatchWrittenBatchExists() { + + InfluxDB connection = connectDatabase(); + + String dbName = "baeldung"; + connection.createDatabase(dbName); + + // Need a retention policy before we can proceed + // Since we are doing batches, we need not set it + connection.createRetentionPolicy("defaultPolicy", "baeldung", "30d", 1, true); + + + BatchPoints batchPoints = BatchPoints + .database(dbName) + .retentionPolicy("defaultPolicy") + .build(); + Point point1 = Point.measurement("memory") + .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) + .addField("free", 4743656L) + .addField("used", 1015096L) + .addField("buffer", 1010467L) + .build(); + Point point2 = Point.measurement("memory") + .time(System.currentTimeMillis() - 100, TimeUnit.MILLISECONDS) + .addField("free", 4743696L) + .addField("used", 1016096L) + .addField("buffer", 1008467L) + .build(); + batchPoints.point(point1); + batchPoints.point(point2); + connection.write(batchPoints); + + List memoryPointList = getPoints(connection, "Select * from memory", "baeldung"); + + assertEquals(2, memoryPointList.size()); + assertTrue(4743696L == memoryPointList.get(0).getFree()); + + + memoryPointList = getPoints(connection, "Select * from memory order by time desc", "baeldung"); + + assertEquals(2, memoryPointList.size()); + assertTrue(4743656L == memoryPointList.get(0).getFree()); + + // Clean up database + connection.deleteDatabase("baeldung"); + connection.close(); + } + +} diff --git a/influxdb/src/test/resources/logback.xml b/influxdb/src/test/resources/logback.xml new file mode 100644 index 0000000000..f8ebaf1ebd --- /dev/null +++ b/influxdb/src/test/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + web - %date [%thread] %-5level %logger{36} - %message%n + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5eb1bb257d..ee7d40203d 100644 --- a/pom.xml +++ b/pom.xml @@ -80,6 +80,7 @@ image-processing immutables + influxdb jackson