This is an automated email from the ASF dual-hosted git repository. alexott pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 94300b3 [ZEPPELIN-4602] Added initial version of InfluxDB interpreter 94300b3 is described below commit 94300b329bec22ed514f79f2a72de5105bdb4a28 Author: Robert Hajek <robert.ha...@gmail.com> AuthorDate: Mon Apr 20 15:09:10 2020 +0200 [ZEPPELIN-4602] Added initial version of InfluxDB interpreter ### What is this PR for? The goal is to add support for querying InfluxDB 2.x using Flux language in Zeppelin notebook. InfluxDB 2.0 (beta) docs https://v2.docs.influxdata.com/v2.0/ Flux language docs: https://docs.influxdata.com/flux/ ### What type of PR is it? Feature ### What is the Jira issue? * Open an issue on Jira https://issues.apache.org/jira/browse/ZEPPELIN-4602 ### How should this be tested? * First time? Setup Travis CI as described on https://zeppelin.apache.org/contribution/contributions.html#continuous-integration * Strongly recommended: add automated unit tests for any new or changed behavior * Outline any manual steps to test the PR here. ### Screenshots (if appropriate) linked in docs/interpreter/influxdb.md Author: Robert Hajek <robert.ha...@gmail.com> Closes #3640 from rhajek/master and squashes the following commits: 206848edc [Robert Hajek] [ZEPPELIN-4602] updated influxdb client libraries to 1.7.0, added support for InfluxDB 1.8, interpreter code cleanup 34cd9ed20 [Robert Hajek] [ZEPPELIN-4602] BaseZeppelinContext replaced 8955205c1 [Robert Hajek] [ZEPPELIN-4602] Updated README.md, removed specific maven-checkstyle-plugin configuration 04b7e28d8 [Robert Hajek] [ZEPPELIN-4602] Added licences b6b9a4848 [Robert Hajek] [ZEPPELIN-4602] Added initial version of InfluxDB interpreter --- .../themes/zeppelin/img/docs-img/influxdb1.png | Bin 0 -> 159175 bytes .../themes/zeppelin/img/docs-img/influxdb2.png | Bin 0 -> 76681 bytes docs/interpreter/influxdb.md | 111 +++++++ influxdb/README.md | 18 ++ influxdb/pom.xml | 79 +++++ .../zeppelin/influxdb/InfluxDBInterpreter.java | 203 +++++++++++++ .../src/main/resources/interpreter-setting.json | 42 +++ .../zeppelin/influxdb/InfluxDBInterpeterTest.java | 327 +++++++++++++++++++++ pom.xml | 1 + zeppelin-distribution/src/bin_license/LICENSE | 3 + .../src/bin_license/licenses/LICENSE-influxdb | 21 ++ 11 files changed, 805 insertions(+) diff --git a/docs/assets/themes/zeppelin/img/docs-img/influxdb1.png b/docs/assets/themes/zeppelin/img/docs-img/influxdb1.png new file mode 100644 index 0000000..6731c56 Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/influxdb1.png differ diff --git a/docs/assets/themes/zeppelin/img/docs-img/influxdb2.png b/docs/assets/themes/zeppelin/img/docs-img/influxdb2.png new file mode 100644 index 0000000..71104ae Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/influxdb2.png differ diff --git a/docs/interpreter/influxdb.md b/docs/interpreter/influxdb.md new file mode 100644 index 0000000..64dd084 --- /dev/null +++ b/docs/interpreter/influxdb.md @@ -0,0 +1,111 @@ +--- +layout: page +title: "InfluxDB Interpreter for Apache Zeppelin" +description: "InfluxDB is an open-source time series database designed to handle high write and query loads." +group: interpreter +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +{% include JB/setup %} + +# InfluxDB Interpreter for Apache Zeppelin + +<div id="toc"></div> + +## Overview +[InfluxDB](https://v2.docs.influxdata.com/v2.0/) is an open-source time series database (TSDB) developed by InfluxData. It is written in Go and optimized for fast, high-availability storage and retrieval of time series data in fields such as operations monitoring, application metrics, Internet of Things sensor data, and real-time analytics. +This interpreter allows to perform queries in [Flux Language](https://v2.docs.influxdata.com/v2.0/reference/flux/) in Zeppelin Notebook. + +### Notes +* This interpreter is compatible with InfluxDB 1.8+ and InfluxDB 2.0+ (v2 API, Flux language) +* Code complete and syntax highlighting is not supported for now + +### Example notebook + + + +### Configuration +<table class="table-configuration"> + <tr> + <th>Property</th> + <th>Default</th> + <th>Value</th> + </tr> + <tr> + <td>influxdb.url</td> + <td>http://localhost:9999</td> + <td>InfluxDB API connection url</td> + </tr> + <tr> + <td>influxdb.org</td> + <td>my-org</td> + <td>organization name, Organizations are supported in InfluxDB 2.0+, use "-" as org for InfluxDB 1.8</td> + </tr> + <tr> + <td>influxdb.token</td> + <td>my-token</td> + <td>authorization token for InfluxDB API, token are supported in InfluxDB 2.0+, for InfluxDB 1.8 use 'username:password' as a token.</td> + </tr> + <tr> + <td>influxdb.logLevel</td> + <td>NONE</td> + <td>InfluxDB client library verbosity level (for debugging purpose)</td> + </tr> +</table> + +#### Example configuration + + + +## Overview + + +## How to use +Basically, you can use + +``` +%influxdb +from(bucket: "my-bucket") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "cpu") + |> filter(fn: (r) => r.cpu == "cpu-total") + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") +``` +In this example we use data collected by `[[inputs.cpu]]` [Telegraf](https://github.com/influxdata/telegraf) input plugin. + +The result of Flux command can contain more one or more tables. In the case of multiple tables, each +table is rendered as a separate %table structure. This example uses `pivot` +function to collect values from multiple tables into single table. + +## How to run InfluxDB 2.0 using docker +```bash +docker pull quay.io/influxdb/influxdb:nightly +docker run --name influxdb -p 9999:9999 quay.io/influxdb/influxdb:nightly + +## Post onBoarding request, to setup initial user (my-user@my-password), org (my-org) and bucketSetup (my-bucket)" +curl -i -X POST http://localhost:9999/api/v2/setup -H 'accept: application/json' \ + -d '{ + "username": "my-user", + "password": "my-password", + "org": "my-org", + "bucket": "my-bucket", + "token": "my-token" + }' + +``` + + + + + diff --git a/influxdb/README.md b/influxdb/README.md new file mode 100644 index 0000000..06ba34f --- /dev/null +++ b/influxdb/README.md @@ -0,0 +1,18 @@ +InfluxDB 2.0 interpreter for Apache Zeppelin +============================================ + +## Description: + +Provide InfluxDB Interpreter for Zeppelin. + +## Build + +``` +mvn -pl influxdb -am -DskipTests package +``` + +## Test + +``` +mvn -pl influxdb -am -Dtest='org.apache.zeppelin.influxdb.*' -DfailIfNoTests=false test +``` diff --git a/influxdb/pom.xml b/influxdb/pom.xml new file mode 100644 index 0000000..8ab2dd9 --- /dev/null +++ b/influxdb/pom.xml @@ -0,0 +1,79 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>zeppelin-interpreter-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath> + </parent> + + <artifactId>zeppelin-influxdb</artifactId> + <packaging>jar</packaging> + <version>0.9.0-SNAPSHOT</version> + <name>Zeppelin: InfluxDB interpreter</name> + <description>InfluxDB 2.0 timeseries database support</description> + + <properties> + <interpreter.name>influxdb</interpreter.name> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <influxdb.client.version>1.7.0</influxdb.client.version> + <dependency.okhttp3.version>3.13.1</dependency.okhttp3.version> + <dependency.gson.version>2.8.5</dependency.gson.version> + </properties> + + <dependencies> + <dependency> + <groupId>com.influxdb</groupId> + <artifactId>influxdb-client-java</artifactId> + <version>${influxdb.client.version}</version> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>${dependency.gson.version}</version> + </dependency> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>mockwebserver</artifactId> + <version>${dependency.okhttp3.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-shade-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> diff --git a/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java b/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java new file mode 100644 index 0000000..1bbcfcd --- /dev/null +++ b/influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.zeppelin.influxdb; + +import java.util.Properties; +import java.util.StringJoiner; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import com.influxdb.LogLevel; +import com.influxdb.client.InfluxDBClient; +import com.influxdb.client.InfluxDBClientFactory; +import com.influxdb.client.InfluxDBClientOptions; +import com.influxdb.client.QueryApi; +import org.apache.zeppelin.interpreter.AbstractInterpreter; +import org.apache.zeppelin.interpreter.ZeppelinContext; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; + +/** + * <a href="https://v2.docs.influxdata.com/v2.0/">InfluxDB 2.0</a> interpreter for Zeppelin. + * It uses /v2/query API, query is written in Flux Language. + * <p> + * How to use: <br/> + * <pre> + * {@code + * %influxdb + * from(bucket: "my-bucket") + * |> range(start: -5m) + * |> filter(fn: (r) => r._measurement == "cpu") + * |> filter(fn: (r) => r._field == "usage_user") + * |> filter(fn: (r) => r.cpu == "cpu-total") + * } + * </pre> + * </p> + */ +public class InfluxDBInterpreter extends AbstractInterpreter { + + private static final String INFLUXDB_API_URL_PROPERTY = "influxdb.url"; + private static final String INFLUXDB_TOKEN_PROPERTY = "influxdb.token"; + private static final String INFLUXDB_ORG_PROPERTY = "influxdb.org"; + private static final String INFLUXDB_LOGLEVEL_PROPERTY = "influxdb.logLevel"; + + private static final String TABLE_MAGIC_TAG = "%table "; + private static final String WHITESPACE = " "; + private static final String NEWLINE = "\n"; + private static final String TAB = "\t"; + private static final String EMPTY_COLUMN_VALUE = ""; + + private volatile InfluxDBClient client; + private volatile QueryApi queryApi; + + public InfluxDBInterpreter(Properties properties) { + super(properties); + } + + @Override + public ZeppelinContext getZeppelinContext() { + return null; + } + + @Override + protected InterpreterResult internalInterpret(String query, InterpreterContext context) + throws InterpreterException { + + logger.debug("Run Flux command '{}'", query); + query = query.trim(); + + QueryApi queryService = getInfluxDBClient(context); + + final int[] actualIndex = {-1}; + + AtomicReference<InterpreterResult> resultRef = new AtomicReference<>(); + CountDownLatch countDownLatch = new CountDownLatch(1); + + StringBuilder result = new StringBuilder(); + queryService.query( + query, + + //process record + (cancellable, fluxRecord) -> { + + Integer tableIndex = fluxRecord.getTable(); + if (actualIndex[0] != tableIndex) { + result.append(NEWLINE); + result.append(TABLE_MAGIC_TAG); + actualIndex[0] = tableIndex; + + //add column names to table header + StringJoiner joiner = new StringJoiner(TAB); + fluxRecord.getValues().keySet().forEach(c -> joiner.add(replaceReservedChars(c))); + result.append(joiner.toString()); + result.append(NEWLINE); + } + + StringJoiner rowsJoiner = new StringJoiner(TAB); + for (Object value : fluxRecord.getValues().values()) { + if (value == null) { + value = EMPTY_COLUMN_VALUE; + } + rowsJoiner.add(replaceReservedChars(value.toString())); + } + result.append(rowsJoiner.toString()); + result.append(NEWLINE); + }, + + throwable -> { + + logger.error(throwable.getMessage(), throwable); + resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR, + throwable.getMessage())); + + countDownLatch.countDown(); + + }, () -> { + //on complete + InterpreterResult intpResult = new InterpreterResult(InterpreterResult.Code.SUCCESS); + intpResult.add(result.toString()); + resultRef.set(intpResult); + countDownLatch.countDown(); + } + ); + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new InterpreterException(e); + } + + return resultRef.get(); + } + + + private QueryApi getInfluxDBClient(InterpreterContext context) { + if (queryApi == null) { + queryApi = this.client.getQueryApi(); + } + return queryApi; + } + + + @Override + public void open() throws InterpreterException { + + if (this.client == null) { + InfluxDBClientOptions opt = InfluxDBClientOptions.builder() + .url(getProperty(INFLUXDB_API_URL_PROPERTY)) + .authenticateToken(getProperty(INFLUXDB_TOKEN_PROPERTY).toCharArray()) + .logLevel(LogLevel.valueOf( + getProperty(INFLUXDB_LOGLEVEL_PROPERTY, LogLevel.NONE.toString()))) + .org(getProperty(INFLUXDB_ORG_PROPERTY)) + .build(); + + this.client = InfluxDBClientFactory.create(opt); + } + } + + @Override + public void close() throws InterpreterException { + if (this.client != null) { + this.client.close(); + this.client = null; + } + } + + @Override + public void cancel(InterpreterContext context) throws InterpreterException { + + } + + @Override + public FormType getFormType() throws InterpreterException { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) throws InterpreterException { + return 0; + } + + /** + * For %table response replace Tab and Newline. + */ + private String replaceReservedChars(String str) { + if (str == null) { + return EMPTY_COLUMN_VALUE; + } + return str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE); + } + +} diff --git a/influxdb/src/main/resources/interpreter-setting.json b/influxdb/src/main/resources/interpreter-setting.json new file mode 100644 index 0000000..363b9ef --- /dev/null +++ b/influxdb/src/main/resources/interpreter-setting.json @@ -0,0 +1,42 @@ +[ + { + "group": "influxdb", + "name": "influxdb", + "className": "org.apache.zeppelin.influxdb.InfluxDBInterpreter", + "properties": { + "influxdb.url": { + "envName": null, + "propertyName": "influxdb.url", + "defaultValue": "http://localhost:9999", + "description": "The URL for InfluxDB 2.X API", + "type": "string" + }, + "influxdb.token": { + "envName": null, + "propertyName": "influxdb.token", + "defaultValue": "my-token", + "description": "InfluxDB auth token", + "type": "password" + }, + "influxdb.org": { + "envName": null, + "propertyName": "influxdb.org", + "defaultValue": "my-org", + "description": "InfluxDB org name", + "type": "string" + }, + "influxdb.logLevel": { + "envName": null, + "propertyName": "influxdb.logLevel", + "defaultValue": "NONE", + "description": "InfluxDB http client library verbosity level (NONE, BASIC, HEADERS, BODY)", + "type": "string" + } + }, + "editor": { + "language": "sql", + "editOnDblClick": false, + "completionSupport": false + } + } +] diff --git a/influxdb/src/test/java/org/apache/zeppelin/influxdb/InfluxDBInterpeterTest.java b/influxdb/src/test/java/org/apache/zeppelin/influxdb/InfluxDBInterpeterTest.java new file mode 100644 index 0000000..3ed6385 --- /dev/null +++ b/influxdb/src/test/java/org/apache/zeppelin/influxdb/InfluxDBInterpeterTest.java @@ -0,0 +1,327 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.zeppelin.influxdb; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import javax.annotation.Nonnull; + +import com.influxdb.LogLevel; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; +import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import org.junit.Before; +import org.junit.Test; + + +public class InfluxDBInterpeterTest { + + Properties properties; + + static final String SINGLE_TABLE_RESPONSE = + "#datatype,string,long,dateTime:RFC3339,double,string\n" + + "#group,false,false,false,false,true\n" + + "#default,_result,,,,\n" + + ",result,table,_time,_value,_field\n" + + ",,0,2020-01-24T10:23:56Z,12.114014251781473,usage_user\n" + + ",,0,2020-01-24T10:23:57Z,12.048493938257717,usage_user\n" + + ",,0,2020-01-24T10:24:06Z,12.715678919729932,usage_user\n" + + ",,0,2020-01-24T10:24:07Z,11.876484560570072,usage_user\n" + + ",,0,2020-01-24T10:24:16Z,10.044977511244378,usage_user\n" + + ",,0,2020-01-24T10:24:17Z,10.594702648675662,usage_user\n" + + ",,0,2020-01-24T10:24:26Z,12.092034512942353,usage_user\n" + + ",,0,2020-01-24T10:24:27Z,12.131065532766383,usage_user\n" + + ",,0,2020-01-24T10:24:36Z,14.332125452955141,usage_user\n" + + ",,0,2020-01-24T10:24:37Z,15.153788447111777,usage_user"; + + static final String MULTI_TABLE_RESPONSE = + "#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339," + + "string,string,string,string,double,dateTime:RFC3339\n" + + "#group,false,false,true,true,true,true,true,true,false,false\n" + + "#default,_result,,,,,,,,,\n" + + ",result,table,_start,_stop,_field,_measurement,cpu,host,_value,_time\n" + + ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," + + "cpu-total,macek.local,12.381414297598637,2020-01-24T09:28:00Z\n" + + ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," + + "cpu-total,macek.local,18.870254041431455,2020-01-24T09:29:00Z\n" + + ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," + + "cpu-total,macek.local,26.64080311971415,2020-01-24T09:30:00Z\n" + + ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," + + "cpu-total,macek.local,11.644120979499911,2020-01-24T09:31:00Z\n" + + ",,0,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu," + + "cpu-total,macek.local,16.046354351571846,2020-01-24T09:32:00Z\n" + + ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," + + "macek.local,23.525686625686625,2020-01-24T09:28:00Z\n" + + ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," + + "macek.local,31.582258129037516,2020-01-24T09:29:00Z\n" + + ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," + + "macek.local,39.20349852756812,2020-01-24T09:30:00Z\n" + + ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," + + "macek.local,23.533275499942164,2020-01-24T09:31:00Z\n" + + ",,1,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu0," + + "macek.local,19.11247206247206,2020-01-24T09:32:00Z\n" + + ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," + + "macek.local,3.775801800801801,2020-01-24T09:28:00Z\n" + + ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," + + "macek.local,8.776226876226875,2020-01-24T09:29:00Z\n" + + ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," + + "macek.local,16.15592568092568,2020-01-24T09:30:00Z\n" + + ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," + + "macek.local,3.466367149700483,2020-01-24T09:31:00Z\n" + + ",,2,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu1," + + "macek.local,10.123511023511023,2020-01-24T09:32:00Z\n" + + ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," + + "macek.local,23.186861861861857,2020-01-24T09:28:00Z\n" + + ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," + + "macek.local,30.502449226101927,2020-01-24T09:29:00Z\n" + + ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," + + "macek.local,37.800263500263505,2020-01-24T09:30:00Z\n" + + ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," + + "macek.local,21.04487655320989,2020-01-24T09:31:00Z\n" + + ",,3,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu2," + + "macek.local,23.40988960155627,2020-01-24T09:32:00Z\n" + + ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," + + "macek.local,3.7013513513513514,2020-01-24T09:28:00Z\n" + + ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," + + "macek.local,8.669684156858507,2020-01-24T09:29:00Z\n" + + ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," + + "macek.local,16.4761093606771,2020-01-24T09:30:00Z\n" + + ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," + + "macek.local,3.416193908762379,2020-01-24T09:31:00Z\n" + + ",,4,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu3," + + "macek.local,10.391479708146376,2020-01-24T09:32:00Z\n" + + ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," + + "macek.local,20.520504495504497,2020-01-24T09:28:00Z\n" + + ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," + + "macek.local,28.435828535828534,2020-01-24T09:29:00Z\n" + + ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," + + "macek.local,35.76454396684968,2020-01-24T09:30:00Z\n" + + ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," + + "macek.local,18.94977031643698,2020-01-24T09:31:00Z\n" + + ",,5,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu4," + + "macek.local,22.81423008923009,2020-01-24T09:32:00Z\n" + + ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," + + "macek.local,3.4502771752771753,2020-01-24T09:28:00Z\n" + + ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," + + "macek.local,8.617365310885685,2020-01-24T09:29:00Z\n" + + ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," + + "macek.local,16.5813353653174,2020-01-24T09:30:00Z\n" + + ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," + + "macek.local,3.341634649967983,2020-01-24T09:31:00Z\n" + + ",,6,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu5," + + "macek.local,10.489286880953548,2020-01-24T09:32:00Z\n" + + ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," + + "macek.local,17.42073857073857,2020-01-24T09:28:00Z\n" + + ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," + + "macek.local,25.555054526024517,2020-01-24T09:29:00Z\n" + + ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," + + "macek.local,34.19774496441163,2020-01-24T09:30:00Z\n" + + ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," + + "macek.local,15.985298393631725,2020-01-24T09:31:00Z\n" + + ",,7,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu6," + + "macek.local,21.359203467536798,2020-01-24T09:32:00Z\n" + + ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," + + "macek.local,3.4507517507517504,2020-01-24T09:28:00Z\n" + + ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," + + "macek.local,8.817554700888033,2020-01-24T09:29:00Z\n" + + ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," + + "macek.local,16.957243048909714,2020-01-24T09:30:00Z\n" + + ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," + + "macek.local,3.408601950268617,2020-01-24T09:31:00Z\n" + + ",,8,2020-01-24T09:27:44.8452185Z,2020-01-24T10:27:44.8452185Z,usage_user,cpu,cpu7," + + "macek.local,10.672760839427506,2020-01-24T09:32:00Z"; + + protected MockWebServer mockServer; + + /** + * Start Mock server. + * + * @return the mock server URL + */ + @Nonnull + protected String startMockServer() { + + mockServer = new MockWebServer(); + try { + mockServer.start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return mockServer.url("/").url().toString(); + } + + + @Nonnull + protected MockResponse createResponse(final String data) { + return createResponse(data, "text/csv", true); + } + + @Nonnull + protected MockResponse createResponse(final String data, final String + contentType, final boolean chunked) { + + MockResponse response = new MockResponse() + .setHeader("Content-Type", contentType + "; charset=utf-8") + .setHeader("Date", "Tue, 26 Jun 2018 13:15:01 GMT"); + + if (chunked) { + response.setChunkedBody(data, data.length()); + } else { + response.setBody(data); + } + + return response; + } + + + @Before + public void before() throws InterpreterException { + //properties for local influxdb2 server + properties = new Properties(); + //properties.setProperty("influxdb.url", "http://localhost:9999"); + properties.setProperty("influxdb.url", startMockServer()); + + properties.setProperty("influxdb.token", "my-token"); + properties.setProperty("influxdb.org", "my-org"); + properties.setProperty("influxdb.logLevel", LogLevel.BODY.toString()); + } + + @After + public void after() throws IOException { + if (mockServer != null) { + mockServer.shutdown(); + } + } + + @Test + public void testSigleTable() throws InterpreterException { + + InfluxDBInterpreter t = new InfluxDBInterpreter(properties); + t.open(); + + //just for testing with real influxdb (not used in mock) + String flux = "from(bucket: \"my-bucket\")\n" + + " |> range(start:-1m)\n" + + " |> filter(fn: (r) => r._measurement == \"cpu\")\n" + + " |> filter(fn: (r) => r._field == \"usage_user\")\n" + + " |> filter(fn: (r) => r.cpu == \"cpu-total\")\n" + + " |> limit(n:5, offset: 0)" + + " |> keep(columns: [\"_field\", \"_value\", \"_time\"])"; + + InterpreterContext context = InterpreterContext.builder() + .setAuthenticationInfo(new AuthenticationInfo("testUser")) + .build(); + + mockServer.enqueue(createResponse(SINGLE_TABLE_RESPONSE)); + + InterpreterResult interpreterResult = t.interpret(flux, context); + + // if prefix not found return ERROR and Prefix not found. + assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); + + List<InterpreterResultMessage> message = interpreterResult.message(); + Assert.assertEquals(1, message.size()); + Assert.assertEquals(InterpreterResult.Type.TABLE, message.get(0).getType()); + Assert.assertEquals("result\ttable\t_time\t_value\t_field\n" + + "_result\t0\t2020-01-24T10:23:56Z\t12.114014251781473\tusage_user\n" + + "_result\t0\t2020-01-24T10:23:57Z\t12.048493938257717\tusage_user\n" + + "_result\t0\t2020-01-24T10:24:06Z\t12.715678919729932\tusage_user\n" + + "_result\t0\t2020-01-24T10:24:07Z\t11.876484560570072\tusage_user\n" + + "_result\t0\t2020-01-24T10:24:16Z\t10.044977511244378\tusage_user\n" + + "_result\t0\t2020-01-24T10:24:17Z\t10.594702648675662\tusage_user\n" + + "_result\t0\t2020-01-24T10:24:26Z\t12.092034512942353\tusage_user\n" + + "_result\t0\t2020-01-24T10:24:27Z\t12.131065532766383\tusage_user\n" + + "_result\t0\t2020-01-24T10:24:36Z\t14.332125452955141\tusage_user\n" + + "_result\t0\t2020-01-24T10:24:37Z\t15.153788447111777\tusage_user\n", + message.get(0).getData()); + + t.close(); + } + + @Test + public void testMultiTable() throws InterpreterException { + + InfluxDBInterpreter t = new InfluxDBInterpreter(properties); + t.open(); + + //just for testing with real influxdb (not used in mock) + String flux = "from(bucket: \"my-bucket\")\n" + + " |> range(start: -1h)\n" + + " |> filter(fn: (r) => r._measurement == \"cpu\")\n" + + " |> filter(fn: (r) => r._field == \"usage_user\")\n" + + " |> aggregateWindow(every: 1m, fn: mean)\n" + + " |> limit(n:5, offset: 0)"; + + InterpreterContext context = InterpreterContext.builder() + .setAuthenticationInfo(new AuthenticationInfo("testUser")) + .build(); + + mockServer.enqueue(createResponse(MULTI_TABLE_RESPONSE)); + InterpreterResult interpreterResult = t.interpret(flux, context); + + // if prefix not found return ERROR and Prefix not found. + if (InterpreterResult.Code.ERROR.equals(interpreterResult.code())) { + Assert.fail(interpreterResult.toString()); + } + + assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); + List<InterpreterResultMessage> message = interpreterResult.message(); + + Assert.assertEquals(9, message.size()); + + message.forEach(m -> Assert.assertEquals(InterpreterResult.Type.TABLE, m.getType())); + + Assert.assertEquals( + "result\ttable\t_start\t_stop\t_field\t_measurement\tcpu\thost\t_value\t_time\n" + + "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" + + "\tcpu\tcpu-total\tmacek.local\t12.381414297598637\t2020-01-24T09:28:00Z\n" + + "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" + + "\tcpu\tcpu-total\tmacek.local\t18.870254041431455\t2020-01-24T09:29:00Z\n" + + "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" + + "\tcpu\tcpu-total\tmacek.local\t26.64080311971415\t2020-01-24T09:30:00Z\n" + + "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" + + "\tcpu\tcpu-total\tmacek.local\t11.644120979499911\t2020-01-24T09:31:00Z\n" + + "_result\t0\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" + + "\tcpu\tcpu-total\tmacek.local\t16.046354351571846\t2020-01-24T09:32:00Z\n", + message.get(0).getData()); + + Assert.assertEquals("result\ttable\t_start\t_stop\t_field\t_measurement\tcpu\thost\t_value" + + "\t_time\n" + + "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" + + "\tcpu\tcpu7\tmacek.local\t3.4507517507517504\t2020-01-24T09:28:00Z\n" + + "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" + + "\tcpu\tcpu7\tmacek.local\t8.817554700888033\t2020-01-24T09:29:00Z\n" + + "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" + + "\tcpu\tcpu7\tmacek.local\t16.957243048909714\t2020-01-24T09:30:00Z\n" + + "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" + + "\tcpu\tcpu7\tmacek.local\t3.408601950268617\t2020-01-24T09:31:00Z\n" + + "_result\t8\t2020-01-24T09:27:44.845218500Z\t2020-01-24T10:27:44.845218500Z\tusage_user" + + "\tcpu\tcpu7\tmacek.local\t10.672760839427506\t2020-01-24T09:32:00Z\n", + message.get(8).getData()); + + t.close(); + } + +} diff --git a/pom.xml b/pom.xml index 8c358f3..6e6e232 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,7 @@ <module>file</module> <module>flink</module> <module>ignite</module> + <module>influxdb</module> <module>kylin</module> <module>python</module> <module>lens</module> diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index 386398e..5a448ac 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -136,6 +136,7 @@ The following components are provided under Apache License. (Apache 2.0) Google Guice - Core Library (com.google.inject:guice:3.0 - http://code.google.com/p/google-guice/guice/) (Apache 2.0) OkHttp (com.squareup.okhttp:okhttp:2.5.0 - https://github.com/square/okhttp/okhttp) (Apache 2.0) Okio (com.squareup.okio:okio:1.6.0 - https://github.com/square/okio/okio) + (Apache 2.0) OkHttp mockwebserver (com.squareup.okhttp3:mockwebserver:3.13.1) - https://github.com/square/okhttp/blob/master/LICENSE.txt (Apache 2.0) config (com.typesafe:config:1.2.1 - https://github.com/typesafehub/config) (Apache 2.0) akka-actor (com.typesafe.akka:akka-actor_2.10:2.3.7 - http://akka.io/) (Apache 2.0) akka-remote (com.typesafe.akka:akka-remote_2.10:2.3.7 - http://akka.io/) @@ -221,6 +222,7 @@ The following components are provided under Apache License. (Apache 2.0) mongo-java-driver 3.4.1 (org.mongodb:mongo-java-driver:3.4.1) - https://github.com/mongodb/mongo-java-driver/blob/master/LICENSE.txt (Apache 2.0) Neo4j Java Driver (https://github.com/neo4j/neo4j-java-driver) - https://github.com/neo4j/neo4j-java-driver/blob/1.4.3/LICENSE.txt (Apache 2.0) Hazelcast Jet (http://jet.hazelcast.org) - https://github.com/hazelcast/hazelcast-jet/blob/master/LICENSE + (Apache 2.0) RxJava (io.reactivex.rxjava2:rxjava:2.2.17) - https://github.com/ReactiveX/RxJava/blob/2.x/LICENSE ======================================================================== MIT licenses @@ -278,6 +280,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version (The MIT License) angular-viewport-watch 0.135 (https://github.com/wix/angular-viewport-watch) - https://github.com/wix/angular-viewport-watch/blob/master/LICENSE (The MIT License) ansi-up 2.0.2 (https://github.com/drudru/ansi_up) - https://github.com/drudru/ansi_up#license (The MIT License) bcpkix-jdk15on 1.60 (org.bouncycastle:bcpkix-jdk15on:1.60 https://github.com/bcgit/bc-java) - https://github.com/bcgit/bc-java/blob/master/LICENSE.html + (The MIT License) influxdb-client-java 1.4.0 (com.influxdb:influxdb-client-java:1.4.0 https://github.com/influxdata/influxdb-client-java) - https://github.com/influxdata/influxdb-client-java/blob/master/LICENSE ======================================================================== BSD-style licenses diff --git a/zeppelin-distribution/src/bin_license/licenses/LICENSE-influxdb b/zeppelin-distribution/src/bin_license/licenses/LICENSE-influxdb new file mode 100644 index 0000000..75f97cf --- /dev/null +++ b/zeppelin-distribution/src/bin_license/licenses/LICENSE-influxdb @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Influxdata, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file