This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 9b9e9a6057 [ZEPPELIN-5785] Remove ksql interpreter (#4434) 9b9e9a6057 is described below commit 9b9e9a6057eb56e432818115c4caf64d1254350f Author: jshjsh06 <35020221+jshjs...@users.noreply.github.com> AuthorDate: Wed Aug 3 17:44:01 2022 +0900 [ZEPPELIN-5785] Remove ksql interpreter (#4434) Co-authored-by: Jongyoul Lee <jongy...@gmail.com> --- .github/workflows/core.yml | 3 +- dev/create_release.sh | 3 +- docs/_includes/themes/zeppelin/_navigation.html | 1 - docs/index.md | 1 - docs/interpreter/ksql.md | 78 ------- ksql/README.md | 10 - ksql/pom.xml | 88 ------- .../apache/zeppelin/ksql/BasicKSQLHttpClient.java | 175 -------------- .../org/apache/zeppelin/ksql/KSQLInterpreter.java | 169 -------------- .../java/org/apache/zeppelin/ksql/KSQLRequest.java | 51 ---- .../org/apache/zeppelin/ksql/KSQLResponse.java | 86 ------- .../org/apache/zeppelin/ksql/KSQLRestService.java | 257 --------------------- ksql/src/main/resources/interpreter-setting.json | 21 -- .../apache/zeppelin/ksql/KSQLInterpreterTest.java | 171 -------------- ksql/src/test/resources/log4j.properties | 30 --- pom.xml | 1 - 16 files changed, 3 insertions(+), 1142 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 43d843267c..3ea6a07aa6 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -82,8 +82,7 @@ jobs: interpreter-test-non-core: runs-on: ubuntu-20.04 env: - INTERPRETERS: 'hbase,jdbc,file,flink-cmd,cassandra,elasticsearch,bigquery,alluxio,livy,groovy,java,neo4j,submarine,sparql,mongodb,influxdb,ksql' - + INTERPRETERS: 'hbase,jdbc,file,flink-cmd,cassandra,elasticsearch,bigquery,alluxio,livy,groovy,java,neo4j,submarine,sparql,mongodb,influxdb' steps: - name: Checkout uses: actions/checkout@v2 diff --git a/dev/create_release.sh b/dev/create_release.sh index faca259cab..028e553650 100755 --- a/dev/create_release.sh +++ b/dev/create_release.sh @@ -98,7 +98,8 @@ function make_binary_release() { git_clone make_source_package -make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl !hbase,!jdbc,!file,!flink,!cassandra,!elasticsearch,!bigquery,!alluxio,!livy,!groovy,!java,!neo4j,!submarine,!sparql,!mongodb,!ksql -am" +make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl !hbase,!jdbc,!file,!flink,!cassandra,!elasticsearch,!bigquery,!alluxio,!livy,!groovy,!java,!neo4j,!submarine,!sparql,!mongodb -am" + make_binary_release all "-Pweb-angular -Phadoop-2.6" # remove non release files and dirs diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index 9fa12a7995..b741c21832 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -150,7 +150,6 @@ <li><a href="{{BASE_PATH}}/interpreter/java.html">Java</a></li> <li><a href="{{BASE_PATH}}/interpreter/jupyter.html">Jupyter</a></li> <li><a href="{{BASE_PATH}}/interpreter/kotlin.html">Kotlin</a></li> - <li><a href="{{BASE_PATH}}/interpreter/ksql.html">KSQL</a></li> <li><a href="{{BASE_PATH}}/interpreter/livy.html">Livy</a></li> <li><a href="{{BASE_PATH}}/interpreter/mahout.html">Mahout</a></li> <li><a href="{{BASE_PATH}}/interpreter/markdown.html">Markdown</a></li> diff --git a/docs/index.md b/docs/index.md index 29af54878d..f0cd7355ed 100644 --- a/docs/index.md +++ b/docs/index.md @@ -148,7 +148,6 @@ limitations under the License. * [JDBC](./interpreter/jdbc.html) * [Jupyter](./interpreter/jupyter.html) * [Kotlin](./interpreter/kotlin.html) - * [KSQL](./interpreter/ksql.html) * [Livy](./interpreter/livy.html) * [Mahout](./interpreter/mahout.html) * [Markdown](./interpreter/markdown.html) diff --git a/docs/interpreter/ksql.md b/docs/interpreter/ksql.md deleted file mode 100644 index 2a308bed4b..0000000000 --- a/docs/interpreter/ksql.md +++ /dev/null @@ -1,78 +0,0 @@ ---- -layout: page -title: "KSQL Interpreter for Apache Zeppelin" -description: "SQL is the streaming SQL engine for Apache Kafka and provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka." -group: interpreter ---- -<!-- -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> -{% include JB/setup %} - -# KSQL Interpreter for Apache Zeppelin - -<div id="toc"></div> - -## Overview -[KSQL](https://www.confluent.io/product/ksql/) is the streaming SQL engine for Apache Kafka®. It provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka, - -## Configuration -<table class="table-configuration"> - <thead> - <tr> - <th>Property</th> - <th>Default</th> - <th>Description</th> - </tr> - </thead> - <tbody> - <tr> - <td>ksql.url</td> - <td>http://localhost:8080</td> - <td>The KSQL Endpoint base URL</td> - </tr> - </tbody> -</table> - -N.b. The interpreter supports all the KSQL properties, i.e. `ksql.streams.auto.offset.reset`. -The full list of KSQL parameters is [here](https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html). - -## Using the KSQL Interpreter -In a paragraph, use `%ksql` and start your SQL query in order to start to interact with KSQL. - -Following some examples: - -``` -%ksql -PRINT 'orders'; -``` - - - -``` -%ksql -CREATE STREAM ORDERS WITH - (VALUE_FORMAT='AVRO', - KAFKA_TOPIC ='orders'); -``` - - - -``` -%ksql -SELECT * -FROM ORDERS -LIMIT 10 -``` - - \ No newline at end of file diff --git a/ksql/README.md b/ksql/README.md deleted file mode 100644 index 22f89f2847..0000000000 --- a/ksql/README.md +++ /dev/null @@ -1,10 +0,0 @@ -# Overview -KSQL interpreter for Apache Zeppelin - -# Connection -The Interpreter opens a connection with the KSQL REST endpoint. - -# Confluent KSQL resources -Following a list of useful resources: - * [Docs](https://docs.confluent.io/current/ksql/docs/index.html) - * [Getting Started](https://github.com/confluentinc/demo-scene/blob/master/ksql-intro/demo_ksql-intro.adoc) diff --git a/ksql/pom.xml b/ksql/pom.xml deleted file mode 100644 index 67820ea894..0000000000 --- a/ksql/pom.xml +++ /dev/null @@ -1,88 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>zeppelin-interpreter-parent</artifactId> - <groupId>org.apache.zeppelin</groupId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath> - </parent> - - <artifactId>zeppelin-ksql</artifactId> - <packaging>jar</packaging> - <name>Zeppelin: Kafka SQL interpreter</name> - - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <interpreter.name>ksql</interpreter.name> - </properties> - - <dependencies> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>2.12.6.1</version> - </dependency> - - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <version>2.11.0</version> - </dependency> - - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </dependency> - - - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <version>3.0.0</version> - <scope>test</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-enforcer-plugin</artifactId> - </plugin> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - </plugin> - <plugin> - <artifactId>maven-shade-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <configuration> - <skip>false</skip> - </configuration> - </plugin> - </plugins> - </build> - -</project> diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java b/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java deleted file mode 100644 index 937a5d3570..0000000000 --- a/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.ksql; - -import org.apache.commons.io.IOUtils; - -import java.io.BufferedReader; -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; - -public class BasicKSQLHttpClient implements Closeable { - - interface BasicHTTPClientResponse { - void onMessage(int status, String message); - - void onError(int status, String message); - } - - private final String jsonData; - private final Map<String, Object> formData; - private final String type; - private final Map<String, String> headers; - private final URL url; - private HttpURLConnection connection; - private final int timeout; - private boolean connected; - - - public BasicKSQLHttpClient(String url, String jsonData, Map<String, Object> formData, - String type, Map<String, String> headers, int timeout) - throws IOException { - this.url = new URL(url); - this.jsonData = jsonData; - this.formData = formData; - this.type = type; - this.headers = headers; - this.timeout = timeout; - this.connected = false; - } - - @Override - public void close() throws IOException { - connected = false; - if (connection != null) { - connection.disconnect(); - } - } - - private void writeOutput(String data) throws IOException { - try (OutputStream os = connection.getOutputStream()) { - byte[] input = data.getBytes(StandardCharsets.UTF_8); - os.write(input); - } - } - - public String connect() throws IOException { - int status = createConnection(); - boolean isStatusOk = isStatusOk(status); - return IOUtils.toString(isStatusOk ? - connection.getInputStream() : connection.getErrorStream(), StandardCharsets.UTF_8.name()); - } - - public void connectAsync(BasicHTTPClientResponse onResponse) throws IOException { - int status = createConnection(); - boolean isStatusOk = isStatusOk(status); - long start = System.currentTimeMillis(); - - try (InputStreamReader in = new InputStreamReader(connection.getInputStream(), - StandardCharsets.UTF_8); - BufferedReader br = new BufferedReader(in)) { - while (connected && (timeout == -1 || System.currentTimeMillis() - start < timeout)) { - if (br.ready()) { - String responseLine = br.readLine(); - if (responseLine == null || responseLine.isEmpty()) { - continue; - } - if (isStatusOk) { - onResponse.onMessage(status, responseLine.trim()); - } else { - onResponse.onError(status, responseLine.trim()); - } - } - } - } - } - - private boolean isStatusOk(int status) { - return status >= 200 && status < 300; - } - - private int createConnection() throws IOException { - this.connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod(this.type); - this.headers.forEach((k, v) -> connection.setRequestProperty(k, v)); - connection.setDoOutput(true); - if (jsonData != null && !jsonData.isEmpty()) { - writeOutput(jsonData); - } else if (formData != null && !formData.isEmpty()) { - String queryStringParams = formData.entrySet() - .stream() - .map(e -> e.getKey() + "=" + e.getValue()) - .collect(Collectors.joining("&")); - writeOutput(queryStringParams); - } - connected = true; - return connection.getResponseCode(); - } - - static class Builder { - private String url; - private String json; - private Map<String, Object> formData = new HashMap<>(); - private String type; - private Map<String, String> headers = new HashMap<>(); - private int timeout = -1; - - public Builder withTimeout(int timeout) { - this.timeout = timeout; - return this; - } - - public Builder withUrl(String url) { - this.url = url; - return this; - } - - public Builder withJson(String json) { - this.json = json; - return this; - } - - public Builder withType(String type) { - this.type = type; - return this; - } - - public Builder withHeader(String header, String value) { - this.headers.put(header, value); - return this; - } - - public Builder withFormData(String name, Object value) { - this.formData.put(name, value); - return this; - } - - public BasicKSQLHttpClient build() throws IOException { - return new BasicKSQLHttpClient(url, json, formData, type, headers, timeout); - } - - } -} diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java deleted file mode 100644 index 92cb8da2f2..0000000000 --- a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java +++ /dev/null @@ -1,169 +0,0 @@ -/* -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 - -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.zeppelin.ksql; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collection; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - - -public class KSQLInterpreter extends Interpreter { - private static final String NEW_LINE = "\n"; - - private static final Logger LOGGER = LoggerFactory.getLogger(KSQLInterpreter.class); - public static final String TABLE_DELIMITER = "\t"; - - private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER); - - private final KSQLRestService ksqlRestService; - - private static final ObjectMapper json = new ObjectMapper(); - - public KSQLInterpreter(Properties properties) { - this(properties, new KSQLRestService(properties.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().toString(), - e -> e.getValue() != null ? e.getValue().toString() : null)))); - } - - // VisibleForTesting - public KSQLInterpreter(Properties properties, KSQLRestService ksqlRestService) { - super(properties); - this.ksqlRestService = ksqlRestService; - } - - @Override - public void open() throws InterpreterException {} - - @Override - public void close() throws InterpreterException { - ksqlRestService.close(); - } - - private String writeValueAsString(Object data) { - try { - if (data instanceof Collection || data instanceof Map) { - return json.writeValueAsString(data); - } - if (data instanceof String) { - return (String) data; - } - return String.valueOf(data); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - private void checkResponseErrors(String message) throws IOException { - if (StringUtils.isNotBlank(message)) { - // throw new RuntimeException(message); - interpreterOutput.getInterpreterOutput().write("%text"); - interpreterOutput.getInterpreterOutput().write(NEW_LINE); - interpreterOutput.getInterpreterOutput().write(message); - } - } - - @Override - public InterpreterResult interpret(String query, - InterpreterContext context) throws InterpreterException { - if (StringUtils.isBlank(query)) { - return new InterpreterResult(InterpreterResult.Code.SUCCESS); - } - interpreterOutput.setInterpreterOutput(context.out); - try { - interpreterOutput.getInterpreterOutput().flush(); - interpreterOutput.getInterpreterOutput().write("%table"); - interpreterOutput.getInterpreterOutput().write(NEW_LINE); - Set<String> header = new LinkedHashSet<>(); - executeQuery(context.getParagraphId(), query.trim(), header); - return new InterpreterResult(InterpreterResult.Code.SUCCESS); - } catch (IOException e) { - return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); - } - } - - private void executeQuery(final String paragraphId, - final String query, Set<String> header) throws IOException { - AtomicBoolean isFirstLine = new AtomicBoolean(true); - ksqlRestService - .executeQuery(paragraphId, query, (resp) -> { - try { - if (resp.getRow() == null || resp.getRow().isEmpty()) { - return; - } - if (isFirstLine.get()) { - isFirstLine.set(false); - header.addAll(resp.getRow().keySet()); - interpreterOutput.getInterpreterOutput().write(header.stream() - .collect(Collectors.joining(TABLE_DELIMITER))); - interpreterOutput.getInterpreterOutput().write(NEW_LINE); - } - interpreterOutput.getInterpreterOutput().write(resp.getRow().values().stream() - .map(this::writeValueAsString) - .collect(Collectors.joining(TABLE_DELIMITER))); - interpreterOutput.getInterpreterOutput().write(NEW_LINE); - checkResponseErrors(resp.getFinalMessage()); - checkResponseErrors(resp.getErrorMessage()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } - - @Override - public void cancel(InterpreterContext context) throws InterpreterException { - LOGGER.info("Trying to cancel paragraphId {}", context.getParagraphId()); - try { - ksqlRestService.closeClient(context.getParagraphId()); - LOGGER.info("Removed"); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public FormType getFormType() throws InterpreterException { - return FormType.SIMPLE; - } - - @Override - public int getProgress(InterpreterContext context) throws InterpreterException { - return 0; - } - - @Override - public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler( - KSQLInterpreter.class.getName() + this.hashCode()); - } -} diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java deleted file mode 100644 index a05a70d3ab..0000000000 --- a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.ksql; - -import java.util.Collections; -import java.util.Map; -import java.util.Objects; - -public class KSQLRequest { - - private static final String EXPLAIN_QUERY = "EXPLAIN %s"; - private final String ksql; - private final Map<String, String> streamsProperties; - - KSQLRequest(final String ksql, final Map<String, String> streamsProperties) { - String inputQuery = Objects.requireNonNull(ksql, "ksql") - .replaceAll("[\\n\\t\\r]", " ") - .trim(); - this.ksql = inputQuery.endsWith(";") ? inputQuery : inputQuery + ";"; - this.streamsProperties = streamsProperties; - } - - KSQLRequest(final String ksql) { - this(ksql, Collections.emptyMap()); - } - - KSQLRequest toExplainRequest() { - return new KSQLRequest(String.format(EXPLAIN_QUERY, this.ksql), this.streamsProperties); - } - - public String getKsql() { - return ksql; - } - - public Map<String, String> getStreamsProperties() { - return streamsProperties; - } -} diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java deleted file mode 100644 index 46461351db..0000000000 --- a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.ksql; - -import java.util.AbstractMap; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collector; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -public class KSQLResponse { - private final Map<String, Object> row; - private final String finalMessage; - private final String errorMessage; - private final boolean terminal; - - private <T, K, U> Collector<T, ?, Map<K, U>> - toLinkedHashMap(Function<? super T, ? extends K> keyMapper, - Function<? super T, ? extends U> valueMapper) { - return Collectors.toMap( - keyMapper, - valueMapper, - (u, v) -> { throw new IllegalStateException(String.format("Duplicate key %s", u)); }, - LinkedHashMap::new); - } - - KSQLResponse(final List<String> fields, final Map<String, Object> row, - final String finalMessage, final String errorMessage, boolean terminal) { - List<Object> columns = row == null ? null : (List<Object>) row.getOrDefault("columns", - Collections.emptyList()); - this.row = row == null ? null : IntStream.range(0, columns.size()) - .mapToObj(index -> new AbstractMap.SimpleEntry<>(fields.get(index), - columns.get(index))) - .collect(toLinkedHashMap(e -> e.getKey(), e -> e.getValue())); - this.finalMessage = finalMessage; - this.errorMessage = errorMessage; - this.terminal = terminal; - } - - KSQLResponse(final List<String> fields, final Map<String, Object> resp) { - this(fields, (Map<String, Object>) resp.get("row"), - (String) resp.get("finalMessage"), - (String) resp.get("errorMessage"), - (boolean) resp.get("terminal")); - } - - KSQLResponse(final Map<String, Object> resp) { - this.row = resp; - this.finalMessage = null; - this.errorMessage = null; - this.terminal = true; - } - - public Map<String, Object> getRow() { - return row; - } - - public String getFinalMessage() { - return finalMessage; - } - - public String getErrorMessage() { - return errorMessage; - } - - public boolean isTerminal() { - return terminal; - } -} diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java deleted file mode 100644 index 4fcb2de1ed..0000000000 --- a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.ksql; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang3.StringUtils; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -public class KSQLRestService { - - private static final String KSQL_ENDPOINT = "%s/ksql"; - private static final String QUERY_ENDPOINT = "%s/query"; - - private static final String KSQL_V1_CONTENT_TYPE = "application/vnd.ksql.v1+json; charset=utf-8"; - - private static final List<String> KSQL_COMMON_FIELDS = Arrays - .asList("statementText", "warnings", "@type"); - private static final String KSQL_URL = "ksql.url"; - - private static final ObjectMapper json = new ObjectMapper(); - - private final String ksqlUrl; - private final String queryUrl; - private final String baseUrl; - private final Map<String, String> streamsProperties; - - private final Map<String, BasicKSQLHttpClient> clientCache; - - public KSQLRestService(Map<String, String> props) { - baseUrl = Objects.requireNonNull(props.get(KSQL_URL), KSQL_URL).toString(); - ksqlUrl = String.format(KSQL_ENDPOINT, baseUrl); - queryUrl = String.format(QUERY_ENDPOINT, baseUrl); - clientCache = new ConcurrentHashMap<>(); - this.streamsProperties = props.entrySet().stream() - .filter(e -> e.getKey().startsWith("ksql.") && !e.getKey().equals(KSQL_URL)) - .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); - } - - - public void executeQuery(final String paragraphId, final String query, - final Consumer<KSQLResponse> callback) throws IOException { - KSQLRequest request = new KSQLRequest(query, streamsProperties); - if (isSelect(request)) { - executeSelect(paragraphId, callback, request); - } else if (isPrint(request)) { - executePrint(paragraphId, callback, request); - } else { - executeKSQL(paragraphId, callback, request); - } - } - - private void executeKSQL(String paragraphId, Consumer<KSQLResponse> callback, - KSQLRequest request) throws IOException { - try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, ksqlUrl)) { - List<Map<String, Object>> queryResponse = json.readValue(client.connect(), List.class); - queryResponse.stream() - .map(map -> excludeKSQLCommonFields(map)) - .flatMap(map -> map.entrySet().stream() - .filter(e -> e.getValue() instanceof List) - .flatMap(e -> ((List<Map<String, Object>>) e.getValue()).stream())) - .map(KSQLResponse::new) - .forEach(callback::accept); - queryResponse.stream() - .map(map -> excludeKSQLCommonFields(map)) - .flatMap(map -> map.entrySet().stream() - .filter(e -> e.getValue() instanceof Map) - .map(e -> (Map<String, Object>) e.getValue())) - .map(KSQLResponse::new) - .forEach(callback::accept); - } - } - - private Map<String, Object> excludeKSQLCommonFields(Map<String, Object> map) { - return map.entrySet().stream() - .filter(e -> !KSQL_COMMON_FIELDS.contains(e.getKey())) - .collect(Collectors - .toMap(e -> e.getKey(), e -> e.getValue())); - } - - private BasicKSQLHttpClient createNewClient(String paragraphId, KSQLRequest request, - String url) throws IOException { - BasicKSQLHttpClient client = new BasicKSQLHttpClient.Builder() - .withUrl(url) - .withJson(json.writeValueAsString(request)) - .withType("POST") - .withHeader("Content-type", KSQL_V1_CONTENT_TYPE) - .build(); - BasicKSQLHttpClient oldClient = clientCache.put(paragraphId, client); - if (oldClient != null) { - oldClient.close(); - } - return client; - } - - private void executeSelect(String paragraphId, Consumer<KSQLResponse> callback, - KSQLRequest request) throws IOException { - List<String> fieldNames = getFields(request); - if (fieldNames.isEmpty()) { - throw new RuntimeException("Field are empty"); - } - try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, queryUrl)) { - client.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() { - @Override - public void onMessage(int status, String message) { - try { - Map<String, Object> queryResponse = json.readValue(message, LinkedHashMap.class); - KSQLResponse resp = new KSQLResponse(fieldNames, queryResponse); - callback.accept(resp); - if (resp.isTerminal() || StringUtils.isNotBlank(resp.getErrorMessage()) - || StringUtils.isNotBlank(resp.getFinalMessage())) { - client.close(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onError(int status, String message) { - try { - KSQLResponse resp = new KSQLResponse(Collections.singletonMap("error", message)); - callback.accept(resp); - client.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }); - } - } - - private void executePrint(String paragraphId, Consumer<KSQLResponse> callback, - KSQLRequest request) throws IOException { - try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, queryUrl)) { - client.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() { - @Override - public void onMessage(int status, String message) { - if (message.toUpperCase().startsWith("FORMAT:")) { - return; - } - List<String> elements = Arrays.asList(message.split(",")); - Map<String, Object> row = new LinkedHashMap<>(); - row.put("timestamp", elements.get(0)); - row.put("offset", elements.get(1)); - row.put("record", String.join("", elements.subList(2, elements.size()))); - KSQLResponse resp = new KSQLResponse(row); - callback.accept(resp); - } - - @Override - public void onError(int status, String message) { - try { - KSQLResponse resp = new KSQLResponse(Collections.singletonMap("error", message)); - callback.accept(resp); - client.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }); - } - } - - private boolean isSelect(KSQLRequest request) { - return request.getKsql().toUpperCase().startsWith("SELECT"); - } - - private boolean isPrint(KSQLRequest request) { - return request.getKsql().toUpperCase().startsWith("PRINT"); - } - - public void closeClient(final String paragraphId) throws IOException { - BasicKSQLHttpClient toClose = clientCache.remove(paragraphId); - if (toClose != null) { - toClose.close(); - } - } - - private List<String> getFields(KSQLRequest request) throws IOException { - return getFields(request, false); - } - - private List<String> getFields(KSQLRequest request, boolean tryCoerce) throws IOException { - if (tryCoerce) { - /* - * this because a query like - * `EXPLAIN SELECT * FROM ORDERS WHERE ADDRESS->STATE = 'New York' LIMIT 10;` - * fails with the message `Column STATE cannot be resolved` - * so we try to coerce the field resolution - */ - String query = request.getKsql() - .substring(0, request.getKsql().toUpperCase().indexOf("WHERE")); - request = new KSQLRequest(query, request.getStreamsProperties()); - } - try (BasicKSQLHttpClient client = new BasicKSQLHttpClient.Builder() - .withUrl(ksqlUrl) - .withJson(json.writeValueAsString(request.toExplainRequest())) - .withType("POST") - .withHeader("Content-type", KSQL_V1_CONTENT_TYPE) - .build()) { - List<Map<String, Object>> explainResponseList = json.readValue(client.connect(), List.class); - Map<String, Object> explainResponse = explainResponseList.get(0); - Map<String, Object> queryDescription = (Map<String, Object>) explainResponse - .getOrDefault("queryDescription", Collections.emptyMap()); - List<Map<String, Object>> fields = (List<Map<String, Object>>) queryDescription - .getOrDefault("fields", Collections.emptyList()); - return fields.stream() - .map(elem -> elem.getOrDefault("name", "").toString()) - .filter(s -> !s.isEmpty()) - .collect(Collectors.toList()); - } catch (IOException e) { - if (!tryCoerce) { - return getFields(request, true); - } else { - throw e; - } - } - } - - - public void close() { - Set<String> keys = clientCache.keySet(); - keys.forEach(key -> { - try { - closeClient(key); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } -} diff --git a/ksql/src/main/resources/interpreter-setting.json b/ksql/src/main/resources/interpreter-setting.json deleted file mode 100644 index cf15bbf2e8..0000000000 --- a/ksql/src/main/resources/interpreter-setting.json +++ /dev/null @@ -1,21 +0,0 @@ -[ - { - "group": "ksql", - "name": "ksql", - "className": "org.apache.zeppelin.ksql.KSQLInterpreter", - "properties": { - "ksql.url": { - "envName": null, - "propertyName": "ksql.url", - "defaultValue": "http://localhost:8088", - "description": "KSQL Endpoint base URL", - "type": "string" - } - }, - "editor": { - "language": "sql", - "editOnDblClick": false, - "completionSupport": false - } - } -] diff --git a/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java b/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java deleted file mode 100644 index 0fd01fd2a6..0000000000 --- a/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java +++ /dev/null @@ -1,171 +0,0 @@ -/* -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 - -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.zeppelin.ksql; - -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterOutput; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.stubbing.Stubber; - -import java.io.IOException; -import java.util.AbstractMap; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; - -import static org.junit.Assert.assertEquals; - -public class KSQLInterpreterTest { - - private InterpreterContext context; - - private static final Map<String, String> PROPS = new HashMap<String, String>() {{ - put("ksql.url", "http://localhost:8088"); - put("ksql.streams.auto.offset.reset", "earliest"); - }}; - - - @Before - public void setUpZeppelin() throws IOException { - context = InterpreterContext.builder() - .setInterpreterOut(new InterpreterOutput()) - .setParagraphId("ksql-test") - .build(); - } - - @Test - public void shouldRenderKSQLSelectAsTable() throws InterpreterException, - IOException, InterruptedException { - // given - Properties p = new Properties(); - p.putAll(PROPS); - KSQLRestService service = Mockito.mock(KSQLRestService.class); - Stubber stubber = Mockito.doAnswer((invocation) -> { - Consumer< KSQLResponse> callback = (Consumer< KSQLResponse>) - invocation.getArguments()[2]; - IntStream.range(1, 5) - .forEach(i -> { - Map<String, Object> map = new HashMap<>(); - if (i == 4) { - map.put("row", null); - map.put("terminal", true); - } else { - map.put("row", Collections.singletonMap("columns", Arrays.asList("value " + i))); - map.put("terminal", false); - } - callback.accept(new KSQLResponse(Arrays.asList("fieldName"), map)); - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }); - return null; - }); - stubber.when(service).executeQuery(Mockito.any(String.class), - Mockito.anyString(), - Mockito.any(Consumer.class)); - Interpreter interpreter = new KSQLInterpreter(p, service); - - // when - String query = "select * from orders"; - interpreter.interpret(query, context); - - // then - String expected = "%table fieldName\n" + - "value 1\n" + - "value 2\n" + - "value 3\n"; - context.out.flush(); - assertEquals(1, context.out.toInterpreterResultMessage().size()); - assertEquals(expected, context.out.toInterpreterResultMessage().get(0).toString()); - assertEquals(InterpreterResult.Type.TABLE, context.out - .toInterpreterResultMessage().get(0).getType()); - interpreter.close(); - } - - @Test - public void shouldRenderKSQLNonSelectAsTable() throws InterpreterException, - IOException, InterruptedException { - // given - Properties p = new Properties(); - p.putAll(PROPS); - KSQLRestService service = Mockito.mock(KSQLRestService.class); - Map<String, Object> row1 = new HashMap<>(); - row1.put("name", "orders"); - row1.put("registered", "false"); - row1.put("replicaInfo", "[1]"); - row1.put("consumerCount", "0"); - row1.put("consumerGroupCount", "0"); - Map<String, Object> row2 = new HashMap<>(); - row2.put("name", "orders"); - row2.put("registered", "false"); - row2.put("replicaInfo", "[1]"); - row2.put("consumerCount", "0"); - row2.put("consumerGroupCount", "0"); - Stubber stubber = Mockito.doAnswer((invocation) -> { - Consumer< KSQLResponse> callback = (Consumer< KSQLResponse>) - invocation.getArguments()[2]; - callback.accept(new KSQLResponse(row1)); - callback.accept(new KSQLResponse(row2)); - return null; - }); - stubber.when(service).executeQuery( - Mockito.any(String.class), - Mockito.anyString(), - Mockito.any(Consumer.class)); - Interpreter interpreter = new KSQLInterpreter(p, service); - - // when - String query = "show topics"; - interpreter.interpret(query, context); - - // then - List<Map<String, Object>> expected = Arrays.asList(row1, row2); - - context.out.flush(); - String[] lines = context.out.toInterpreterResultMessage() - .get(0).toString() - .replace("%table ", "") - .trim() - .split("\n"); - List<String[]> rows = Stream.of(lines) - .map(line -> line.split("\t")) - .collect(Collectors.toList()); - List<Map<String, String>> actual = rows.stream() - .skip(1) - .map(row -> IntStream.range(0, row.length) - .mapToObj(index -> new AbstractMap.SimpleEntry<>(rows.get(0)[index], row[index])) - .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()))) - .collect(Collectors.toList()); - assertEquals(1, context.out.toInterpreterResultMessage().size()); - assertEquals(expected, actual); - assertEquals(InterpreterResult.Type.TABLE, context.out - .toInterpreterResultMessage().get(0).getType()); - } -} diff --git a/ksql/src/test/resources/log4j.properties b/ksql/src/test/resources/log4j.properties deleted file mode 100644 index 4f78acf5ac..0000000000 --- a/ksql/src/test/resources/log4j.properties +++ /dev/null @@ -1,30 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Direct log messages to stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.out -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c:%L - %m%n -#log4j.appender.stdout.layout.ConversionPattern= -#%5p [%t] (%F:%L) - %m%n -#%-4r [%t] %-5p %c %x - %m%n -# - -# Root logger option -log4j.rootLogger=INFO, stdout -#log4j.logger.org.apache.zeppelin.interpreter=DEBUG diff --git a/pom.xml b/pom.xml index af54768c4d..17c6dbbaec 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,6 @@ <module>alluxio</module> <module>neo4j</module> <module>java</module> - <module>ksql</module> <module>sparql</module> <module>zeppelin-common</module> <module>zeppelin-client</module>