This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 9eb3aae092 [ZEPPELIN-5768] Remove scio interpreter (#4408) 9eb3aae092 is described below commit 9eb3aae092497eae2dce98f6aeb4feafc6c663f3 Author: In-Kyu Kim <60086878+ink...@users.noreply.github.com> AuthorDate: Fri Jul 15 16:37:21 2022 +0900 [ZEPPELIN-5768] Remove scio interpreter (#4408) --- .github/workflows/core.yml | 2 +- .github/workflows/frontend.yml | 2 +- .gitignore | 2 - beam/pom.xml | 6 - beam/src/main/resources/interpreter-setting.json | 24 - conf/interpreter-list | 1 - dev/create_release.sh | 3 +- docs/_includes/themes/zeppelin/_navigation.html | 1 - docs/index.md | 1 - docs/interpreter/scio.md | 169 ------ docs/usage/interpreter/installation.md | 5 - pom.xml | 5 +- scio/README.md | 18 - scio/pom.xml | 178 ------- .../org/apache/zeppelin/scio/ContextAndArgs.scala | 41 -- .../org/apache/zeppelin/scio/DisplayHelpers.scala | 167 ------ .../scio/DisplaySCollectionImplicits.scala | 105 ---- .../apache/zeppelin/scio/DisplayTapImplicits.scala | 154 ------ .../org/apache/zeppelin/scio/ScioInterpreter.scala | 201 -------- scio/src/test/avro/schema.avsc | 12 - .../apache/zeppelin/scio/ScioInterpreterTest.java | 109 ---- scio/src/test/resources/log4j.properties | 22 - .../zeppelin/scio/DisplayHelpersTestScala211.scala | 55 -- .../org/apache/zeppelin/scio/TestCCScala211.scala | 22 - .../apache/zeppelin/scio/DisplayHelpersTest.scala | 570 --------------------- .../scala/org/apache/zeppelin/scio/TestCC.scala | 24 - .../org/apache/zeppelin/scio/util/TestUtils.scala | 50 -- zeppelin-distribution/src/bin_license/LICENSE | 5 - 28 files changed, 6 insertions(+), 1948 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 78624b381d..bc5012f4c5 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -82,7 +82,7 @@ jobs: interpreter-test-non-core: runs-on: ubuntu-20.04 env: - INTERPRETERS: 'beam,hbase,pig,jdbc,file,flink-cmd,ignite,cassandra,elasticsearch,bigquery,alluxio,scio,livy,groovy,sap,java,geode,neo4j,hazelcastjet,submarine,sparql,mongodb,influxdb,ksql' + INTERPRETERS: 'beam,hbase,pig,jdbc,file,flink-cmd,ignite,cassandra,elasticsearch,bigquery,alluxio,livy,groovy,sap,java,geode,neo4j,hazelcastjet,submarine,sparql,mongodb,influxdb,ksql' steps: - name: Checkout uses: actions/checkout@v2 diff --git a/.github/workflows/frontend.yml b/.github/workflows/frontend.yml index 11624c66b9..01c26164b2 100644 --- a/.github/workflows/frontend.yml +++ b/.github/workflows/frontend.yml @@ -19,7 +19,7 @@ env: SPARK_PRINT_LAUNCH_COMMAND: "true" SPARK_LOCAL_IP: 127.0.0.1 ZEPPELIN_LOCAL_IP: 127.0.0.1 - INTERPRETERS: '!beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet,!submarine,!sparql,!mongodb' + INTERPRETERS: '!beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!cassandra,!elasticsearch,!bigquery,!alluxio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet,!submarine,!sparql,!mongodb' jobs: run-e2e-tests-in-zeppelin-web: diff --git a/.gitignore b/.gitignore index e52d5a63a2..425d6fbaaa 100644 --- a/.gitignore +++ b/.gitignore @@ -55,8 +55,6 @@ zeppelin-web/yarn.lock .Rhistory /R/ -# scio -.bigquery/ # project level /logs/ diff --git a/beam/pom.xml b/beam/pom.xml index 19b91bbe5f..58cfdcb30e 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -116,12 +116,6 @@ <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>zeppelin-scio</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> diff --git a/beam/src/main/resources/interpreter-setting.json b/beam/src/main/resources/interpreter-setting.json index e9b4a73c2a..189e08b5d8 100644 --- a/beam/src/main/resources/interpreter-setting.json +++ b/beam/src/main/resources/interpreter-setting.json @@ -9,29 +9,5 @@ "editor": { "editOnDblClick": false } - }, - { - "group": "beam", - "name": "scio", - "className": "org.apache.zeppelin.scio.ScioInterpreter", - "properties": { - "zeppelin.scio.argz": { - "envName": "ZEPPELIN_SCIO_ARGZ", - "propertyName": "zeppelin.scio.argz", - "defaultValue": "--runner=InProcessPipelineRunner", - "description": "Scio interpreter wide arguments", - "type": "textarea" - }, - "zeppelin.scio.maxResult": { - "envName": "ZEPPELIN_SCIO_MAXRESULT", - "propertyName": "zeppelin.scio.maxResult", - "defaultValue": "1000", - "description": "Max number of SCollection results to display.", - "type": "number" - } - }, - "editor": { - "language": "scala" - } } ] diff --git a/conf/interpreter-list b/conf/interpreter-list index 270d243b5c..766c66fa96 100644 --- a/conf/interpreter-list +++ b/conf/interpreter-list @@ -39,7 +39,6 @@ neo4j org.apache.zeppelin:zeppelin-neo4j:0.10.0 Neo4j i pig org.apache.zeppelin:zeppelin-pig:0.10.0 Pig interpreter python org.apache.zeppelin:zeppelin-python:0.10.0 Python interpreter sap org.apache.zeppelin:zeppelin-sap:0.10.0 SAP Support -scio org.apache.zeppelin:zeppelin-scio:0.10.0 Scio interpreter shell org.apache.zeppelin:zeppelin-shell:0.10.0 Shell command sparql org.apache.zeppelin:zeppelin-sparql:0.10.0 Sparql interpreter submarine org.apache.zeppelin:zeppelin-submarine:0.10.0 Submarine interpreter diff --git a/dev/create_release.sh b/dev/create_release.sh index ae2162aa9b..a9571502b6 100755 --- a/dev/create_release.sh +++ b/dev/create_release.sh @@ -97,7 +97,8 @@ function make_binary_release() { git_clone make_source_package -make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl !beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet,!submarine,!sparql,!mongodb,!ksql -am" + +make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl !beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!cassandra,!elasticsearch,!bigquery,!alluxio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet,!submarine,!sparql,!mongodb,!ksql -am" make_binary_release all "-Pweb-angular -Phadoop-2.6" # remove non release files and dirs diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index ceed569605..d7b0cb9fbc 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -163,7 +163,6 @@ <li><a href="{{BASE_PATH}}/interpreter/pig.html">Pig</a></li> <li><a href="{{BASE_PATH}}/interpreter/postgresql.html">Postgresql, HAWQ</a></li> <li><a href="{{BASE_PATH}}/interpreter/sap.html">SAP</a></li> - <li><a href="{{BASE_PATH}}/interpreter/scio.html">Scio</a></li> <li><a href="{{BASE_PATH}}/interpreter/shell.html">Shell</a></li> <li><a href="{{BASE_PATH}}/interpreter/sparql.html">Sparql</a></li> <li><a href="{{BASE_PATH}}/interpreter/submarine.html">Submarine</a></li> diff --git a/docs/index.md b/docs/index.md index d3e5a461d5..8837b3b95b 100644 --- a/docs/index.md +++ b/docs/index.md @@ -163,7 +163,6 @@ limitations under the License. * [Python](./interpreter/python.html) * [R](./interpreter/r.html) * [SAP](./interpreter/sap.html) - * [Scio](./interpreter/scio.html) * [Shell](./interpreter/shell.html) * [Spark](./interpreter/spark.html) * [Sparql](./interpreter/sparql.html) diff --git a/docs/interpreter/scio.md b/docs/interpreter/scio.md deleted file mode 100644 index cb8d1278ec..0000000000 --- a/docs/interpreter/scio.md +++ /dev/null @@ -1,169 +0,0 @@ ---- -layout: page -title: "Scio Interpreter for Apache Zeppelin" -description: "Scio is a Scala DSL for Apache Beam/Google Dataflow model." -group: interpreter ---- -<!-- -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> -{% include JB/setup %} - -# Scio Interpreter for Apache Zeppelin - -<div id="toc"></div> - -## Overview -Scio is a Scala DSL for [Google Cloud Dataflow](https://github.com/GoogleCloudPlatform/DataflowJavaSDK) and [Apache Beam](http://beam.incubator.apache.org/) inspired by [Spark](http://spark.apache.org/) and [Scalding](https://github.com/twitter/scalding). See the current [wiki](https://github.com/spotify/scio/wiki) and [API documentation](http://spotify.github.io/scio/) for more information. - -## Configuration -<table class="table-configuration"> - <tr> - <th>Name</th> - <th>Default Value</th> - <th>Description</th> - </tr> - <tr> - <td>zeppelin.scio.argz</td> - <td>--runner=InProcessPipelineRunner</td> - <td>Scio interpreter wide arguments. Documentation: https://github.com/spotify/scio/wiki#options and https://cloud.google.com/dataflow/pipelines/specifying-exec-params</td> - </tr> - <tr> - <td>zeppelin.scio.maxResult</td> - <td>1000</td> - <td>Max number of SCollection results to display</td> - </tr> - -</table> - -## Enabling the Scio Interpreter - -In a notebook, to enable the **Scio** interpreter, click the **Gear** icon and select **beam** (**beam.scio**). - -## Using the Scio Interpreter - -In a paragraph, use `%beam.scio` to select the **Scio** interpreter. You can use it much the same way as vanilla Scala REPL and [Scio REPL](https://github.com/spotify/scio/wiki/Scio-REPL). State (like variables, imports, execution etc) is shared among all *Scio* paragraphs. There is a special variable **argz** which holds arguments from Scio interpreter settings. The easiest way to proceed is to create a Scio context via standard `ContextAndArgs`. - -```scala -%beam.scio -val (sc, args) = ContextAndArgs(argz) -``` - -Use `sc` context the way you would in a regular pipeline/REPL. - -Example: - -```scala -%beam.scio -val (sc, args) = ContextAndArgs(argz) -sc.parallelize(Seq("foo", "foo", "bar")).countByValue.closeAndDisplay() -``` - -If you close Scio context, go ahead an create a new one using `ContextAndArgs`. Please refer to [Scio wiki](https://github.com/spotify/scio/wiki) for more complex examples. You can close Scio context much the same way as in Scio REPL, and use Zeppelin display helpers to synchronously close and display results - read more below. - -### Progress - -There can be only one paragraph running at once. There is no notion of overall progress, thus progress bar will show `0`. - -### SCollection display helpers - -Scio interpreter comes with display helpers to ease working with Zeppelin notebooks. Simply use `closeAndDisplay()` on `SCollection` to close context and display the results. The number of results is limited by `zeppelin.scio.maxResult` (by default 1000). - -Supported `SCollection` types: - - * Scio's typed BigQuery - * Scala's Products (case classes, tuples) - * Google BigQuery's TableRow - * Apache Avro - * All Scala's `AnyVal` - -#### Helper methods - -There are different helper methods for different objects. You can easily display results from `SCollection`, `Future[Tap]` and `Tap`. - -##### `SCollection` helper - -`SCollection` has `closeAndDisplay` Zeppelin helper method for types listed above. Use it to synchronously close Scio context, and once available pull and display results. - -##### `Future[Tap]` helper - -`Future[Tap]` has `waitAndDisplay` Zeppelin helper method for types listed above. Use it to synchronously wait for results, and once available pull and display results. - -##### `Tap` helper - -`Tap` has `display` Zeppelin helper method for types listed above. Use it to pull and display results. - -### Examples - -#### BigQuery example: - -```scala -%beam.scio -@BigQueryType.fromQuery("""|SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays - |FROM [bigquery-samples:airline_ontime_data.flights] - |group by departure_airport - |order by 2 desc - |limit 10""".stripMargin) class Flights - -val (sc, args) = ContextAndArgs(argz) -sc.bigQuerySelect(Flights.query).closeAndDisplay(Flights.schema) -``` - -#### BigQuery typed example: - -```scala -%beam.scio -@BigQueryType.fromQuery("""|SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays - |FROM [bigquery-samples:airline_ontime_data.flights] - |group by departure_airport - |order by 2 desc - |limit 10""".stripMargin) class Flights - -val (sc, args) = ContextAndArgs(argz) -sc.typedBigQuery[Flights]().flatMap(_.no_of_delays).mean.closeAndDisplay() -``` - -#### Avro example: - -```scala -%beam.scio -import com.spotify.data.ExampleAvro - -val (sc, args) = ContextAndArgs(argz) -sc.avroFile[ExampleAvro]("gs://<bucket>/tmp/my.avro").take(10).closeAndDisplay() -``` - -#### Avro example with a view schema: - -```scala -%beam.scio -import com.spotify.data.ExampleAvro -import org.apache.avro.Schema - -val (sc, args) = ContextAndArgs(argz) -val view = Schema.parse("""{"type":"record","name":"ExampleAvro","namespace":"com.spotify.data","fields":[{"name":"track","type":"string"}, {"name":"artist", "type":"string"}]}""") - -sc.avroFile[EndSongCleaned]("gs://<bucket>/tmp/my.avro").take(10).closeAndDisplay(view) -``` - -### Google credentials - -Scio Interpreter will try to infer your Google Cloud credentials from its environment, it will take into the account: - - * `argz` interpreter settings ([doc](https://github.com/spotify/scio/wiki#options)) - * environment variable (`GOOGLE_APPLICATION_CREDENTIALS`) - * gcloud configuration - -#### BigQuery macro credentials - -Currently BigQuery project for macro expansion is inferred using Google Dataflow's [DefaultProjectFactory().create()](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/GcpOptions.java#L187) diff --git a/docs/usage/interpreter/installation.md b/docs/usage/interpreter/installation.md index aaea7b8ebc..a9705fee22 100644 --- a/docs/usage/interpreter/installation.md +++ b/docs/usage/interpreter/installation.md @@ -212,11 +212,6 @@ You can also find the below community managed interpreter list in `conf/interpre <td>org.apache.zeppelin:zeppelin-sap:0.10.0</td> <td>SAP support</td> </tr> - <tr> - <td>scio</td> - <td>org.apache.zeppelin:zeppelin-scio:0.10.0</td> - <td>Scio interpreter</td> - </tr> <tr> <td>shell</td> <td>org.apache.zeppelin:zeppelin-shell:0.10.0</td> diff --git a/pom.xml b/pom.xml index b4c1814c9a..c4f5a0f92c 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,6 @@ <module>elasticsearch</module> <module>bigquery</module> <module>alluxio</module> - <module>scio</module> <module>neo4j</module> <module>sap</module> <module>java</module> @@ -1455,7 +1454,7 @@ </goals> <configuration> <failOnViolation>true</failOnViolation> - <excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/scio/avro/*,org/apache/zeppelin/python/proto/*</excludes> + <excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/python/proto/*</excludes> </configuration> </execution> <execution> @@ -1465,7 +1464,7 @@ <goal>checkstyle-aggregate</goal> </goals> <configuration> - <excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/scio/avro/*,org/apache/zeppelin/python/proto/*</excludes> + <excludes>org/apache/zeppelin/interpreter/thrift/*,org/apache/zeppelin/python/proto/*</excludes> </configuration> </execution> </executions> diff --git a/scio/README.md b/scio/README.md deleted file mode 100644 index 73d0cb67f1..0000000000 --- a/scio/README.md +++ /dev/null @@ -1,18 +0,0 @@ -Scio interpreter for Apache Zeppelin -==================================== - -## Raison d'ĂȘtre: - -Provide Scio Interpreter for Zeppelin. - -## Build - -``` -./mvnw -pl zeppelin-interpreter,zeppelin-display,scio -DskipTests package -``` - -## Test - -``` -./mvnw -pl scio,zeppelin-display,zeppelin-interpreter -Dtest='org.apache.zeppelin.scio.*' -DfailIfNoTests=false test -``` diff --git a/scio/pom.xml b/scio/pom.xml deleted file mode 100644 index d3c3c1a214..0000000000 --- a/scio/pom.xml +++ /dev/null @@ -1,178 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>zeppelin-interpreter-parent</artifactId> - <groupId>org.apache.zeppelin</groupId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath> - </parent> - - <artifactId>zeppelin-scio</artifactId> - <packaging>jar</packaging> - <name>Zeppelin: Scio</name> - <description>Zeppelin Scio support</description> - - <properties> - <interpreter.name>scio</interpreter.name> - <!--library versions--> - <scio.version>0.2.4</scio.version> - <guava.version>14.0.1</guava.version> <!-- update needed --> - <scio.scala.version>${scala.2.10.version}</scio.scala.version> - <scio.scala.binary.version>2.10</scio.scala.binary.version> - - <!--test library versions--> - <hamcrest.all.version>1.3</hamcrest.all.version> - </properties> - - <dependencies> - - <dependency> - <groupId>com.spotify</groupId> - <artifactId>scio-repl_${scio.scala.binary.version}</artifactId> - <version>${scio.version}</version> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.2.10.version}</version> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - <version>${scala.2.10.version}</version> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - <version>${scala.2.10.version}</version> - </dependency> - - <!-- test libraries --> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scio.scala.binary.version}</artifactId> - <version>${scalatest.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <version>${hamcrest.all.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <scope>test</scope> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-enforcer-plugin</artifactId> - </plugin> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - </plugin> - <plugin> - <artifactId>maven-shade-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <forkCount>1</forkCount> - <reuseForks>false</reuseForks> - <argLine>-Xmx1024m -XX:MaxMetaspaceSize=512m</argLine> - </configuration> - </plugin> - - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <configuration> - <scalaVersion>${scio.scala.version}</scalaVersion> - </configuration> - <executions> - <execution> - <id>compile</id> - <goals> - <goal>compile</goal> - </goals> - <phase>compile</phase> - </execution> - <execution> - <id>test-compile</id> - <goals> - <goal>testCompile</goal> - </goals> - <phase>test-compile</phase> - </execution> - <execution> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - </executions> - </plugin> - - <!-- plugin to compile avro for tests --> - <plugin> - <groupId>org.apache.avro</groupId> - <artifactId>avro-maven-plugin</artifactId> - <executions> - <execution> - <phase>generate-sources</phase> - <goals> - <goal>schema</goal> - </goals> - <configuration> - <sourceDirectory>${project.basedir}/src/test/avro/</sourceDirectory> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - </build> -</project> diff --git a/scio/src/main/scala/org/apache/zeppelin/scio/ContextAndArgs.scala b/scio/src/main/scala/org/apache/zeppelin/scio/ContextAndArgs.scala deleted file mode 100644 index cb1b390be4..0000000000 --- a/scio/src/main/scala/org/apache/zeppelin/scio/ContextAndArgs.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scio - -import com.google.cloud.dataflow.sdk.options.PipelineOptions -import com.spotify.scio.repl.ReplScioContext -import com.spotify.scio.{Args, ScioContext} - -/** - * Convenience object for creating [[com.spotify.scio.ScioContext]] and [[com.spotify.scio.Args]]. - */ -object ContextAndArgs { - def apply(argz: Array[String]): (ScioContext, Args) = { - val (dfOpts, args) = ScioContext.parseArguments[PipelineOptions](argz) - - val nextReplJar = this - .getClass - .getClassLoader - .asInstanceOf[{def getNextReplCodeJarPath: String}].getNextReplCodeJarPath - - val sc = new ReplScioContext(dfOpts, List(nextReplJar)) - sc.setName("sciozeppelin") - - (sc, args) - } -} diff --git a/scio/src/main/scala/org/apache/zeppelin/scio/DisplayHelpers.scala b/scio/src/main/scala/org/apache/zeppelin/scio/DisplayHelpers.scala deleted file mode 100644 index bfb4f9c73a..0000000000 --- a/scio/src/main/scala/org/apache/zeppelin/scio/DisplayHelpers.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scio - -import com.google.api.services.bigquery.model.TableSchema -import com.spotify.scio.bigquery._ -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord - -import scala.reflect.ClassTag - -/** - * Set of helpers for Zeppelin Display system. - */ -private[scio] object DisplayHelpers { - - private[scio] val sCollectionEmptyMsg = - "\n%html <font color=red>Result SCollection is empty!</font>\n" - private val maxResults = Integer.getInteger("zeppelin.scio.maxResult", 1000) - private[scio] val tab = "\t" - private[scio] val newline = "\n" - private[scio] val table = "%table" - private[scio] val endTable = "\n%text" - private[scio] val rowLimitReachedMsg = - s"$newline<font color=red>Results are limited to " + maxResults + s" rows.</font>$newline" - private[scio] val bQSchemaIncomplete = - s"$newline<font color=red>Provided BigQuery Schema has not fields!</font>$newline" - - private def notifyIfTruncated(it: Iterator[_]): Unit = { - if(it.hasNext) println(rowLimitReachedMsg) - } - - /** - * Displays [[AnyVal]] values from given [[Iterator]]. - */ - private[scio] def displayAnyVal[T: ClassTag](it: Iterator[T], printer: (T) => String): Unit = { - if (it.isEmpty) { - println(sCollectionEmptyMsg) - } else { - println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}") - println(endTable) - notifyIfTruncated(it) - } - } - - /** - * Displays [[String]] values from given [[Iterator]]. - */ - private[scio] def displayString[T: ClassTag](it: Iterator[T], printer: (T) => String): Unit = { - if (it.isEmpty) { - println(sCollectionEmptyMsg) - } else { - println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}") - println(endTable) - notifyIfTruncated(it) - } - } - - /** - * Displays [[com.google.cloud.dataflow.sdk.values.KV]] values from given [[Iterator]]. - */ - private[scio] def displayKV[K: ClassTag, V: ClassTag](it: Iterator[(K,V)]): Unit = { - if (it.isEmpty) { - println(sCollectionEmptyMsg) - } else { - val content = it.take(maxResults).map{ case (k, v) => s"$k$tab$v" }.mkString(newline) - println(s"$table key${tab}value$newline$content") - println(endTable) - notifyIfTruncated(it) - } - } - - /** - * Displays [[Product]] values from given [[Iterator]]. - */ - private[scio] def displayProduct[T: ClassTag](it: Iterator[T]) - (implicit ev: T <:< Product): Unit = { - if (it.isEmpty) { - println(sCollectionEmptyMsg) - } else { - val first = it.next() - //TODO is this safe field name to value iterator? - val fieldNames = first.getClass.getDeclaredFields.map(_.getName) - - val header = fieldNames.mkString(tab) - val firstStr = first.productIterator.mkString(tab) - val content = it.take(maxResults - 1).map(_.productIterator.mkString(tab)).mkString(newline) - println(s"$table $header$newline$firstStr$newline$content") - println(endTable) - notifyIfTruncated(it) - } - } - - /** - * Displays Avro values from given [[Iterator]] using optional [[Schema]]. - * @param schema optional "view" schema, otherwise schema is inferred from the first object - */ - private[scio] def displayAvro[T: ClassTag](it: Iterator[T], schema: Schema = null) - (implicit ev: T <:< GenericRecord): Unit = { - if (it.isEmpty) { - println(sCollectionEmptyMsg) - } else { - val first = it.next() - import collection.JavaConverters._ - - val fieldNames = if (schema != null) { - schema.getFields.iterator().asScala.map(_.name()).toArray - } else { - first.getSchema.getFields.iterator.asScala.map(_.name()).toArray - } - - val header = fieldNames.mkString(tab) - val firstStr = fieldNames.map(first.get).mkString(tab) - val content = it.take(maxResults - 1) - .map(r => fieldNames.map(r.get).mkString(tab)) - .mkString(newline) - println(s"$table $header$newline$firstStr$newline$content") - println(endTable) - notifyIfTruncated(it) - } - } - - /** - * Displays [[TableRow]] values from given [[Iterator]] using specified [[TableSchema]]. - */ - private[scio] def displayBQTableRow[T: ClassTag](it: Iterator[T], schema: TableSchema) - (implicit ev: T <:< TableRow) : Unit = { - if (it.isEmpty) { - println(sCollectionEmptyMsg) - } else { - import collection.JavaConverters._ - val fieldsOp = Option(schema.getFields) - fieldsOp match { - case None => println(bQSchemaIncomplete) - case Some(f) => { - val fields = f.asScala.map(_.getName).toArray - - val header = fields.mkString(tab) - - val content = it.take(maxResults) - .map(r => fields.map(r.get).mkString(tab)) - .mkString(newline) - - println(s"$table $header$newline$content") - println(endTable) - notifyIfTruncated(it) - } - } - } - } - -} diff --git a/scio/src/main/scala/org/apache/zeppelin/scio/DisplaySCollectionImplicits.scala b/scio/src/main/scala/org/apache/zeppelin/scio/DisplaySCollectionImplicits.scala deleted file mode 100644 index 566e106c57..0000000000 --- a/scio/src/main/scala/org/apache/zeppelin/scio/DisplaySCollectionImplicits.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scio - -import com.google.api.services.bigquery.model.TableSchema -import com.spotify.scio._ -import com.spotify.scio.bigquery._ -import com.spotify.scio.values.SCollection -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord - -import scala.reflect.ClassTag - -/** - * Implicit Zeppelin display helpers for SCollection. - */ -object DisplaySCollectionImplicits { - - private def materialize[T: ClassTag](self: SCollection[T]) = { - val f = self.materialize - self.context.close() - f - } - - // TODO: scala 2.11 - // implicit class ZeppelinSCollection[T: ClassTag](private val self: SCollection[T])(implicit ev: T <:< AnyVal) extends AnyVal { - implicit class ZeppelinSCollection[T: ClassTag](val self: SCollection[T]) - (implicit ev: T <:< AnyVal) { - /** Convenience method to close the current [[com.spotify.scio.ScioContext]] - * and display elements from SCollection. */ - def closeAndDisplay(printer: (T) => String = (e: T) => e.toString): Unit = { - DisplayTapImplicits.ZeppelinTap(materialize(self).waitForResult()).display(printer) - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinSCollection[T: ClassTag](private val self: SCollection[T]) extends AnyVal { - implicit class ZeppelinStringSCollection[T: ClassTag](val self: SCollection[T]) - (implicit ev: T <:< String) { - /** Convenience method to close the current [[com.spotify.scio.ScioContext]] - * and display elements from SCollection. */ - def closeAndDisplay(printer: (T) => String = (e: T) => e.toString): Unit = { - DisplayTapImplicits.ZeppelinStringTap(materialize(self).waitForResult()).display(printer) - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinKVSCollection[K: ClassTag, V: ClassTag](val self: SCollection[(K, V)]) extends AnyVal { - implicit class ZeppelinKVSCollection[K: ClassTag, V: ClassTag](val self: SCollection[(K, V)]) { - /** Convenience method to close the current [[com.spotify.scio.ScioContext]] - * and display elements from KV SCollection. */ - def closeAndDisplay(): Unit = { - DisplayTapImplicits.ZeppelinKVTap(materialize(self).waitForResult()).display() - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinProductSCollection[T: ClassTag](val self: SCollection[T])(implicit ev: T <:< Product) extends AnyVal { - implicit class ZeppelinProductSCollection[T: ClassTag](val self: SCollection[T]) - (implicit ev: T <:< Product) { - /** Convenience method to close the current [[com.spotify.scio.ScioContext]] - * and display elements from Product like SCollection */ - def closeAndDisplay(): Unit = { - DisplayTapImplicits.ZeppelinProductTap(materialize(self).waitForResult()).display() - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinAvroSCollection[T: ClassTag](val self: SCollection[T])(implicit ev: T <:< GenericRecord) extends AnyVal { - implicit class ZeppelinAvroSCollection[T: ClassTag](val self: SCollection[T]) - (implicit ev: T <:< GenericRecord) { - /** Convenience method to close the current [[com.spotify.scio.ScioContext]] - * and display elements from Avro like SCollection */ - def closeAndDisplay(schema: Schema = null): Unit = { - DisplayTapImplicits.ZeppelinAvroTap(materialize(self).waitForResult()).display(schema) - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinBQTableSCollection[T: ClassTag](val self: SCollection[T])(implicit ev: T <:< TableRow) extends AnyVal { - implicit class ZeppelinBQTableSCollection[T: ClassTag](val self: SCollection[T]) - (implicit ev: T <:< TableRow) { - /** Convenience method to close the current [[com.spotify.scio.ScioContext]] - * and display elements from TableRow like SCollection */ - def closeAndDisplay(schema: TableSchema): Unit = { - DisplayTapImplicits.ZeppelinBQTableTap(materialize(self).waitForResult()).display(schema) - } - } - -} diff --git a/scio/src/main/scala/org/apache/zeppelin/scio/DisplayTapImplicits.scala b/scio/src/main/scala/org/apache/zeppelin/scio/DisplayTapImplicits.scala deleted file mode 100644 index 8aafc310a0..0000000000 --- a/scio/src/main/scala/org/apache/zeppelin/scio/DisplayTapImplicits.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scio - -import com.google.api.services.bigquery.model.TableSchema -import com.spotify.scio.bigquery.TableRow -import com.spotify.scio.io.Tap -import com.spotify.scio._ -import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord - -import scala.concurrent.Future -import scala.reflect.ClassTag - -/** - * Implicit Zeppelin display helpers for [[Tap]] and [[Future]] of a [[Tap]]. - */ -object DisplayTapImplicits { - - // TODO: scala 2.11 - // implicit class ZeppelinTap[T: ClassTag](private val self: Tap[T])(implicit ev: T <:< AnyVal) extends AnyVal { - implicit class ZeppelinTap[T: ClassTag](val self: Tap[T]) - (implicit ev: T <:< AnyVal) { - /** Convenience method to display [[com.spotify.scio.io.Tap]] of AnyVal. */ - def display(printer: (T) => String = (e: T) => e.toString): Unit = { - DisplayHelpers.displayAnyVal(self.value, printer) - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinFutureTap[T: ClassTag](private val self: Future[Tap[T]])(implicit ev: T <:< AnyVal) extends AnyVal { - implicit class ZeppelinFutureTap[T: ClassTag](val self: Future[Tap[T]]) - (implicit ev: T <:< AnyVal) { - /** Convenience method to display [[Future]] of a [[com.spotify.scio.io.Tap]] of AnyVal. */ - def waitAndDisplay(printer: (T) => String = (e: T) => e.toString): Unit = { - ZeppelinTap(self.waitForResult()).display(printer) - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinStringTap[T: ClassTag](private val self: Tap[T])(implicit ev: T <:< String) extends AnyVal { - implicit class ZeppelinStringTap[T: ClassTag](val self: Tap[T]) - (implicit ev: T <:< String) { - /** Convenience method to display [[com.spotify.scio.io.Tap]] of Strings. */ - def display(printer: (T) => String = (e: T) => e.toString): Unit = { - DisplayHelpers.displayString(self.value, printer) - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinFutureStringTap[T: ClassTag](private val self: Tap[T])(implicit ev: T <:< String) extends AnyVal { - implicit class ZeppelinFutureStringTap[T: ClassTag](val self: Future[Tap[T]]) - (implicit ev: T <:< String) { - /** Convenience method to display [[Future]] of a [[com.spotify.scio.io.Tap]] of Strings. */ - def waitAndDisplay(printer: (T) => String = (e: T) => e.toString): Unit = { - ZeppelinStringTap(self.waitForResult()).display(printer) - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinKVTap[K: ClassTag, V: ClassTag](val self: Tap[(K, V)]) extends AnyVal { - implicit class ZeppelinKVTap[K: ClassTag, V: ClassTag](val self: Tap[(K, V)]) { - /** Convenience method to display [[com.spotify.scio.io.Tap]] of KV. */ - def display(): Unit = { - DisplayHelpers.displayKV(self.value) - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinFutureKVTap[K: ClassTag, V: ClassTag](val self: Future[Tap[(K, V)]]) extends AnyVal { - implicit class ZeppelinFutureKVTap[K: ClassTag, V: ClassTag](val self: Future[Tap[(K, V)]]) { - /** Convenience method to display [[Future]] of a [[com.spotify.scio.io.Tap]] of KV. */ - def waitAndDisplay(): Unit = { - ZeppelinKVTap(self.waitForResult()).display() - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinProductTap[T: ClassTag](val self: Tap[T])(implicit ev: T <:< Product) extends AnyVal { - implicit class ZeppelinProductTap[T: ClassTag](val self: Tap[T]) - (implicit ev: T <:< Product) { - /** Convenience method to display [[com.spotify.scio.io.Tap]] of Product. */ - def display(): Unit = { - DisplayHelpers.displayProduct(self.value) - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinFutureProductTap[T: ClassTag](val self: Future[Tap[T]])(implicit ev: T <:< Product) extends AnyVal { - implicit class ZeppelinFutureProductTap[T: ClassTag](val self: Future[Tap[T]]) - (implicit ev: T <:< Product) { - /** Convenience method to display [[Future]] of a [[com.spotify.scio.io.Tap]] of Product. */ - def waitAndDisplay(): Unit = { - ZeppelinProductTap(self.waitForResult()).display() - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinAvroTap[T: ClassTag](val self: Tap[T])(implicit ev: T <:< GenericRecord) extends AnyVal { - implicit class ZeppelinAvroTap[T: ClassTag](val self: Tap[T]) - (implicit ev: T <:< GenericRecord) { - /** Convenience method to display [[com.spotify.scio.io.Tap]] of Avro. */ - def display(schema: Schema = null): Unit = { - DisplayHelpers.displayAvro(self.value, schema) - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinFutureAvroTap[T: ClassTag](val self: Future[Tap[T]])(implicit ev: T <:< GenericRecord) extends AnyVal { - implicit class ZeppelinFutureAvroTap[T: ClassTag](val self: Future[Tap[T]]) - (implicit ev: T <:< GenericRecord) { - /** Convenience method to display [[Future]] of a [[com.spotify.scio.io.Tap]] of Avro. */ - def waitAndDisplay(schema: Schema = null): Unit = { - ZeppelinAvroTap(self.waitForResult()).display(schema) - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinBQTableTap[T: ClassTag](val self: Tap[T])(implicit ev: T <:< TableRow) extends AnyVal { - implicit class ZeppelinBQTableTap[T: ClassTag](val self: Tap[T]) - (implicit ev: T <:< TableRow) { - /** Convenience method to display [[com.spotify.scio.io.Tap]] of BigQuery TableRow. */ - def display(schema: TableSchema): Unit = { - DisplayHelpers.displayBQTableRow(self.value, schema) - } - } - - // TODO: scala 2.11 - // implicit class ZeppelinFutureBQTableTap[T: ClassTag](val self: Future[Tap[T]])(implicit ev: T <:< TableRow) extends AnyVal { - implicit class ZeppelinFutureBQTableTap[T: ClassTag](val self: Future[Tap[T]]) - (implicit ev: T <:< TableRow) { - /** Convenience method to display [[Future]] of a [[com.spotify.scio.io.Tap]] of BigQuery - * TableRow. */ - def waitAndDisplay(schema: TableSchema): Unit = { - ZeppelinBQTableTap(self.waitForResult()).display(schema) - } - } - -} diff --git a/scio/src/main/scala/org/apache/zeppelin/scio/ScioInterpreter.scala b/scio/src/main/scala/org/apache/zeppelin/scio/ScioInterpreter.scala deleted file mode 100644 index f3400815b5..0000000000 --- a/scio/src/main/scala/org/apache/zeppelin/scio/ScioInterpreter.scala +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scio - -import java.beans.Introspector -import java.io.PrintStream -import java.util.Properties - -import com.google.cloud.dataflow.sdk.options.{PipelineOptions, PipelineOptionsFactory} -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner -import com.spotify.scio.repl.{ScioILoop, ScioReplClassLoader} -import org.apache.zeppelin.interpreter.Interpreter.FormType -import org.apache.zeppelin.interpreter.util.InterpreterOutputStream -import org.apache.zeppelin.interpreter.{Interpreter, InterpreterContext, InterpreterResult} -import org.slf4j.LoggerFactory - -import scala.reflect.io.File -import scala.tools.nsc.GenericRunnerCommand -import scala.tools.nsc.interpreter.JPrintWriter -import scala.tools.nsc.util.ClassPath - -/** - * Scio interpreter for Zeppelin. - * - * <ul> - * <li>{@code zeppelin.scio.argz} - Scio interpreter wide arguments</li> - * <li>{@code zeppelin.scio.maxResult} - Max number of SCollection results to display.</li> - * </ul> - * - * <p> - * How to use: <br/> - * {@code - * $beam.scio - * val (sc, args) = ContextAndArgs(argz) - * sc.parallelize(Seq("foo", "foo", "bar")).countByValue.closeAndDisplay() - * } - * </p> - * - */ - -class ScioInterpreter(property: Properties) extends Interpreter(property) { - private val logger = LoggerFactory.getLogger(classOf[ScioInterpreter]) - private var REPL: ScioILoop = _ - - val innerOut = new InterpreterOutputStream(logger) - - override def open(): Unit = { - val argz = Option(getProperty("zeppelin.scio.argz")) - .getOrElse(s"--runner=${classOf[InProcessPipelineRunner].getSimpleName}") - .split(" ") - .map(_.trim) - .filter(_.nonEmpty) - .toList - - // Process command line arguments into a settings object, and use that to start the REPL. - // We ignore params we don't care about - hence error function is empty - val command = new GenericRunnerCommand(argz, _ => ()) - val settings = command.settings - - settings.classpath.append(System.getProperty("java.class.path")) - settings.usejavacp.value = true - - def classLoaderURLs(cl: ClassLoader): Array[java.net.URL] = cl match { - case null => Array() - case u: java.net.URLClassLoader => u.getURLs ++ classLoaderURLs(cl.getParent) - case _ => classLoaderURLs(cl.getParent) - } - - classLoaderURLs(Thread.currentThread().getContextClassLoader) - .foreach(u => settings.classpath.append(u.getPath)) - - // We have to make sure that scala macros are expandable. paradise plugin has to be added to - // -Xplugin paths. In case of assembly - paradise is included in assembly jar - thus we add - // itself to -Xplugin. If shell is started from sbt or classpath, paradise jar has to be in - // classpath, we find it and add it to -Xplugin. - - val thisJar = this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath - // In some cases this may be `target/classes` - if(thisJar.endsWith(".jar")) settings.plugin.appendToValue(thisJar) - - ClassPath - .split(settings.classpath.value) - .find(File(_).name.startsWith("paradise_")) - .foreach(settings.plugin.appendToValue) - - // Force the repl to be synchronous, so all cmds are executed in the same thread - settings.Yreplsync.value = true - - val jars = ClassPath.split(settings.classpath.value) - .flatMap(ClassPath.specToURL) - .toArray - - val scioClassLoader = new ScioReplClassLoader( - jars ++ classLoaderURLs(Thread.currentThread().getContextClassLoader), - null, - Thread.currentThread.getContextClassLoader) - - val (dfArgs, _) = parseAndPartitionArgs(argz) - - REPL = new ScioILoop(scioClassLoader, dfArgs, None, new JPrintWriter(innerOut)) - scioClassLoader.setRepl(REPL) - - // Set classloader chain - expose top level abstract class loader down - // the chain to allow for readObject and latestUserDefinedLoader - // See https://gist.github.com/harrah/404272 - settings.embeddedDefaults(scioClassLoader) - - // No need for bigquery dumps - sys.props("bigquery.plugin.disable.dump") = true.toString - - REPL.settings_=(settings) - REPL.createInterpreter() - REPL.interpret(s"""val argz = Array("${argz.mkString("\", \"")}")""") - REPL.interpret("import org.apache.zeppelin.scio.DisplaySCollectionImplicits._") - REPL.interpret("import org.apache.zeppelin.scio.DisplayTapImplicits._") - REPL.interpret("import org.apache.zeppelin.scio.ContextAndArgs") - } - - private def parseAndPartitionArgs(args: List[String]): (List[String], List[String]) = { - import scala.collection.JavaConverters._ - // Extract --pattern of all registered derived types of PipelineOptions - val classes = PipelineOptionsFactory.getRegisteredOptions.asScala + classOf[PipelineOptions] - val optPatterns = classes.flatMap { cls => - cls.getMethods.flatMap { m => - val n = m.getName - if ((!n.startsWith("get") && !n.startsWith("is")) || - m.getParameterTypes.nonEmpty || m.getReturnType == classOf[Unit]) None - else Some(Introspector.decapitalize(n.substring(if (n.startsWith("is")) 2 else 3))) - }.map(s => s"--$s($$|=)".r) - } - - // Split cmdlineArgs into 2 parts, optArgs for PipelineOptions and appArgs for Args - args.partition(arg => optPatterns.exists(_.findFirstIn(arg).isDefined)) - } - - override def close(): Unit = { - logger.info("Closing Scio interpreter!") - REPL.closeInterpreter() - } - - override def interpret(code: String, context: InterpreterContext): InterpreterResult = { - val paragraphId = context.getParagraphId - - val consoleOut = new PrintStream(innerOut) - System.setOut(consoleOut) - innerOut.setInterpreterOutput(context.out) - - try { - import scala.tools.nsc.interpreter.Results._ - REPL.interpret(code) match { - case Success => { - logger.debug(s"Successfully executed `$code` in $paragraphId") - new InterpreterResult(InterpreterResult.Code.SUCCESS) - } - case Error => { - logger.error(s"Error executing `$code` in $paragraphId") - new InterpreterResult(InterpreterResult.Code.ERROR, "Interpreter error") - } - case Incomplete => { - logger.warn(s"Code `$code` not complete in $paragraphId") - new InterpreterResult(InterpreterResult.Code.INCOMPLETE, "Incomplete expression") - } - } - } catch { - case e: Exception => - logger.info("Interpreter exception", e) - new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage) - } finally { - innerOut.flush() - innerOut.setInterpreterOutput(null) - consoleOut.flush() - } - } - - override def cancel(context: InterpreterContext): Unit = { - // not implemented - } - - override def getFormType: FormType = FormType.NATIVE - - override def getProgress(context: InterpreterContext): Int = { - // not implemented - 0 - } - -} diff --git a/scio/src/test/avro/schema.avsc b/scio/src/test/avro/schema.avsc deleted file mode 100644 index 07c3bea888..0000000000 --- a/scio/src/test/avro/schema.avsc +++ /dev/null @@ -1,12 +0,0 @@ -{ - "type": "record", - "name": "Account", - "namespace": "org.apache.zeppelin.scio.avro", - "doc": "Record for an account", - "fields": [ - {"name": "id", "type": "int"}, - {"name": "type", "type": "string"}, - {"name": "name", "type": "string"}, - {"name": "amount", "type": "double"} - ] -} diff --git a/scio/src/test/java/org/apache/zeppelin/scio/ScioInterpreterTest.java b/scio/src/test/java/org/apache/zeppelin/scio/ScioInterpreterTest.java deleted file mode 100644 index 2e5c0d9bf0..0000000000 --- a/scio/src/test/java/org/apache/zeppelin/scio/ScioInterpreterTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scio; - -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.resource.LocalResourcePool; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.Before; -import org.junit.Test; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Properties; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class ScioInterpreterTest { - private static ScioInterpreter repl; - private static InterpreterGroup intpGroup; - private InterpreterContext context; - - private final String newline = "\n"; - - private InterpreterContext getNewContext() { - return InterpreterContext.builder() - .setNoteId("noteId") - .setParagraphId("paragraphId") - .build(); - } - - @Before - public void setUp() throws Exception { - if (repl == null) { - intpGroup = new InterpreterGroup(); - intpGroup.put("note", new LinkedList<Interpreter>()); - repl = new ScioInterpreter(new Properties()); - repl.setInterpreterGroup(intpGroup); - intpGroup.get("note").add(repl); - repl.open(); - } - - context = getNewContext(); - } - - @Test - public void testBasicSuccess() { - assertEquals(InterpreterResult.Code.SUCCESS, - repl.interpret("val a = 1" + newline + "val b = 2", context).code()); - } - - @Test - public void testBasicSyntaxError() { - InterpreterResult error = repl.interpret("val a:Int = 'ds'", context); - assertEquals(InterpreterResult.Code.ERROR, error.code()); - assertEquals("Interpreter error", error.message().get(0).getData()); - } - - @Test - public void testBasicIncomplete() { - InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context); - assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code()); - assertEquals("Incomplete expression", incomplete.message().get(0).getData()); - } - - @Test - public void testBasicPipeline() { - assertEquals(InterpreterResult.Code.SUCCESS, - repl.interpret("val (sc, _) = ContextAndArgs(argz)" + newline - + "sc.parallelize(1 to 10).closeAndCollect().toList", context).code()); - } - - @Test - public void testBasicMultiStepPipeline() { - final StringBuilder code = new StringBuilder(); - code.append("val (sc, _) = ContextAndArgs(argz)").append(newline) - .append("val numbers = sc.parallelize(1 to 10)").append(newline) - .append("val results = numbers.closeAndCollect().toList").append(newline) - .append("println(results)"); - assertEquals(InterpreterResult.Code.SUCCESS, - repl.interpret(code.toString(), context).code()); - } - - @Test - public void testException() { - InterpreterResult exception = repl.interpret("val (sc, _) = ContextAndArgs(argz)" + newline - + "throw new Exception(\"test\")", context); - assertEquals(InterpreterResult.Code.ERROR, exception.code()); - assertTrue(exception.message().get(0).getData().length() > 0); - } - -} diff --git a/scio/src/test/resources/log4j.properties b/scio/src/test/resources/log4j.properties deleted file mode 100644 index 8daee59d60..0000000000 --- a/scio/src/test/resources/log4j.properties +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -log4j.rootLogger = INFO, stdout - -log4j.appender.stdout = org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout = org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n diff --git a/scio/src/test/scala-2.11/org/apache/zeppelin/scio/DisplayHelpersTestScala211.scala b/scio/src/test/scala-2.11/org/apache/zeppelin/scio/DisplayHelpersTestScala211.scala deleted file mode 100644 index 729dc87f5c..0000000000 --- a/scio/src/test/scala-2.11/org/apache/zeppelin/scio/DisplayHelpersTestScala211.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scio - -import org.apache.zeppelin.scio.util.TestUtils -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{FlatSpec, Matchers} - -/** - * Scala 2.11 DisplayHelpersTest tests. - * - * Most tests have test scope implicit imports due to scala 2.10 bug - * https://issues.scala-lang.org/browse/SI-3346 - * - * Note: we can't depend on the order of data coming from SCollection. - */ -@RunWith(classOf[JUnitRunner] -class DisplayHelpersTestScala211 extends FlatSpec with Matchers { - import TestUtils._ - - // ----------------------------------------------------------------------------------------------- - // Product SCollection Tests - // ----------------------------------------------------------------------------------------------- - - it should "support SCollection of Case Class of 23" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinProductSCollection - val tupleHeader = s"$table " + (1 to 22).map(i => s"a$i$tab").mkString + "a23" - val o = captureOut { - sideEffectWithData( - Seq.fill(3)(CC23(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23))) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs (Seq(tupleHeader) ++ - Seq.fill(3)((1 to 22).map(i => s"$i$tab").mkString + "23")) - o.head should be(tupleHeader) - } - -} diff --git a/scio/src/test/scala-2.11/org/apache/zeppelin/scio/TestCCScala211.scala b/scio/src/test/scala-2.11/org/apache/zeppelin/scio/TestCCScala211.scala deleted file mode 100644 index eca120598f..0000000000 --- a/scio/src/test/scala-2.11/org/apache/zeppelin/scio/TestCCScala211.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scio - -case class CC23(a1: Int, a2: Int, a3: Int, a4: Int, a5: Int, a6: Int, a7: Int, a8: Int, a9: Int, - a10: Int, a11: Int, a12: Int, a13: Int, a14: Int, a15: Int, a16: Int, a17: Int, - a18: Int, a19: Int, a20: Int, a21: Int, a22: Int, a23: Int) \ No newline at end of file diff --git a/scio/src/test/scala/org/apache/zeppelin/scio/DisplayHelpersTest.scala b/scio/src/test/scala/org/apache/zeppelin/scio/DisplayHelpersTest.scala deleted file mode 100644 index a197fafc2d..0000000000 --- a/scio/src/test/scala/org/apache/zeppelin/scio/DisplayHelpersTest.scala +++ /dev/null @@ -1,570 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scio - -import com.google.api.services.bigquery.model.{TableFieldSchema, TableSchema} -import com.spotify.scio.bigquery._ -import org.apache.avro.Schema -import org.apache.avro.Schema.Parser -import org.apache.avro.generic.{GenericData, GenericRecord} -import org.apache.zeppelin.scio.avro.Account -import org.apache.zeppelin.scio.util.TestUtils -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{FlatSpec, Matchers} - -/** - * DisplayHelpersTest tests. - * - * Most tests have test scope implicit imports due to scala 2.10 bug - * https://issues.scala-lang.org/browse/SI-3346 - * - * Note: we can't depend on the order of data coming from SCollection. - */ -@RunWith(classOf[JUnitRunner]) -class DisplayHelpersTest extends FlatSpec with Matchers { - private val testRowLimit = 20 - sys.props("zeppelin.scio.maxResult") = 20.toString - - import TestUtils._ - - // ----------------------------------------------------------------------------------------------- - // AnyVal SCollection Tests - // ----------------------------------------------------------------------------------------------- - - private val anyValHeader = s"$table value" - private val endTableFooter = DisplayHelpers.endTable.split("\\n").last - private val endTableSeq = Seq("", endTableFooter) - - "DisplayHelpers" should "support Integer SCollection via AnyVal" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinSCollection - val o = captureOut { - sideEffectWithData(Seq(1, 2, 3)) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs Seq(anyValHeader, - "1", - "2", - "3") ++ endTableSeq - o.head should be(anyValHeader) - o.last should be(endTableFooter) - } - - it should "support Long SCollection via AnyVal" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinSCollection - val o = captureOut { - sideEffectWithData(Seq(1L, 2L, 3L)) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs Seq(anyValHeader, - "1", - "2", - "3") ++ endTableSeq - o.head should be(anyValHeader) - o.last should be(endTableFooter) - } - - it should "support Double SCollection via AnyVal" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinSCollection - val o = captureOut { - sideEffectWithData(Seq(1.0D, 2.0D, 3.0D)) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs Seq(anyValHeader, - "1.0", - "2.0", - "3.0") ++ endTableSeq - o.head should be(anyValHeader) - o.last should be(endTableFooter) - } - - it should "support Float SCollection via AnyVal" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinSCollection - val o = captureOut { - sideEffectWithData(Seq(1.0F, 2.0F, 3.0F)) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs Seq(anyValHeader, - "1.0", - "2.0", - "3.0") ++ endTableSeq - o.head should be(anyValHeader) - o.last should be(endTableFooter) - } - - it should "support Short SCollection via AnyVal" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinSCollection - val o = captureOut { - sideEffectWithData(Seq(1.toShort, 2.toShort, 3.toShort)) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs Seq(anyValHeader, - "1", - "2", - "3") ++ endTableSeq - o.head should be(anyValHeader) - o.last should be(endTableFooter) - } - - it should "support Byte SCollection via AnyVal" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinSCollection - val o = captureOut { - sideEffectWithData(Seq(1.toByte, 2.toByte, 3.toByte)) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs Seq(anyValHeader, - "1", - "2", - "3") ++ endTableSeq - o.head should be(anyValHeader) - o.last should be(endTableFooter) - } - - it should "support Boolean SCollection via AnyVal" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinSCollection - val o = captureOut { - sideEffectWithData(Seq(true, false, true)) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs Seq(anyValHeader, - "true", - "false", - "true") ++ endTableSeq - o.head should be(anyValHeader) - o.last should be(endTableFooter) - } - - it should "support Char SCollection via AnyVal" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinSCollection - val o = captureOut { - sideEffectWithData(Seq('a', 'b', 'c')) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs Seq(anyValHeader, - "a", - "b", - "c") ++ endTableSeq - o.head should be(anyValHeader) - o.last should be(endTableFooter) - } - - it should "support SCollection of AnyVal over row limit" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinSCollection - val o = captureOut { - sideEffectWithData(1 to 21) { in => - in.closeAndDisplay() - } - } - o.size should be > testRowLimit - o.head should be(anyValHeader) - o.last should be(rowLimitReached) - } - - it should "support empty SCollection of AnyVal" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinSCollection - val o = captureOut { - sideEffectWithData(Seq.empty[AnyVal]) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs DisplayHelpers.sCollectionEmptyMsg.split(newline) - } - - // ----------------------------------------------------------------------------------------------- - // String SCollection Tests - // ----------------------------------------------------------------------------------------------- - - private val stringHeader = s"$table value" - - it should "support String SCollection" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinStringSCollection - val o = captureOut { - sideEffectWithData(Seq("a","b","c")) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs Seq(stringHeader, - "a", - "b", - "c") ++ endTableSeq - o.head should be (stringHeader) - o.last should be (endTableFooter) - } - - it should "support empty SCollection of String" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinStringSCollection - val o = captureOut { - sideEffectWithData(Seq.empty[String]) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs DisplayHelpers.sCollectionEmptyMsg.split(newline) - } - - it should "support SCollection of String over row limit" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinStringSCollection - val o = captureOut { - sideEffectWithData(Seq.fill(21)("a")) { in => - in.closeAndDisplay() - } - } - o.size should be > testRowLimit - o.head should be(stringHeader) - o.last should be(rowLimitReached) - } - - // ----------------------------------------------------------------------------------------------- - // KV SCollection Tests - // ----------------------------------------------------------------------------------------------- - - private val kvHeader = s"$table key${tab}value" - - it should "support KV (ints) SCollection" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinKVSCollection - val o = captureOut { - sideEffectWithData(Seq((1,2), (3,4))) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs Seq(kvHeader, - s"3${tab}4", - s"1${tab}2") ++ endTableSeq - o.head should be (kvHeader) - o.last should be (endTableFooter) - } - - it should "support KV (str keys) SCollection" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinKVSCollection - val o = captureOut { - sideEffectWithData(Seq(("foo",2), ("bar",4))) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs Seq(kvHeader, - s"foo${tab}2", - s"bar${tab}4") ++ endTableSeq - o.head should be (kvHeader) - o.last should be (endTableFooter) - } - - it should "support KV (str values) SCollection" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinKVSCollection - val o = captureOut { - sideEffectWithData(Seq((2,"foo"), (4,"bar"))) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs Seq(kvHeader, - s"2${tab}foo", - s"4${tab}bar") ++ endTableSeq - o.head should be (kvHeader) - o.last should be (endTableFooter) - } - - it should "support empty KV SCollection" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinKVSCollection - captureOut { - sideEffectWithData(Seq.empty[(Int, Int)]) { in => - in.closeAndDisplay() - } - } should contain theSameElementsAs DisplayHelpers.sCollectionEmptyMsg.split(newline) - } - - it should "support SCollection of KV over row limit" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinKVSCollection - val o = captureOut { - sideEffectWithData(Seq.fill(21)(("foo", 1))) { in => - in.closeAndDisplay() - } - } - o.size should be > testRowLimit - o.head should be(kvHeader) - o.last should be(rowLimitReached) - } - - // ----------------------------------------------------------------------------------------------- - // Product SCollection Tests - // ----------------------------------------------------------------------------------------------- - - private val testCaseClassHeader = s"$table foo${tab}bar${tab}a" - - it should "support SCollection of Tuple of 3" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinProductSCollection - val tupleHeader = s"$table _1${tab}_2${tab}_3" - val o = captureOut { - sideEffectWithData(Seq.fill(3)((1,2,3))) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs - (Seq(tupleHeader) ++ Seq.fill(3)(s"1${tab}2${tab}3") ++ endTableSeq) - o.head should be(tupleHeader) - o.last should be (endTableFooter) - } - - it should "support SCollection of Tuple of 22" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinProductSCollection - val tupleHeader = s"$table " + (1 to 21).map(i => s"_$i$tab").mkString + "_22" - val o = captureOut { - sideEffectWithData( - Seq.fill(3)((1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22))) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs (Seq(tupleHeader) ++ - Seq.fill(3)((1 to 21).map(i => s"$i$tab").mkString + "22") ++ endTableSeq) - o.head should be(tupleHeader) - o.last should be (endTableFooter) - } - - it should "support SCollection of Case Class of 22" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinProductSCollection - val tupleHeader = s"$table " + (1 to 21).map(i => s"a$i$tab").mkString + "a22" - val o = captureOut { - sideEffectWithData( - Seq.fill(3)(CC22(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22))) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs (Seq(tupleHeader) ++ - Seq.fill(3)((1 to 21).map(i => s"$i$tab").mkString + "22") ++ endTableSeq) - o.head should be(tupleHeader) - o.last should be (endTableFooter) - } - - it should "support SCollection of Case Class" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinProductSCollection - val o = captureOut { - sideEffectWithData(Seq.fill(3)(TestCaseClass(1, "foo", 2.0D))) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs (Seq(testCaseClassHeader) ++ - Seq.fill(3)(s"1${tab}foo${tab}2.0") ++ endTableSeq) - o.head should be(testCaseClassHeader) - o.last should be (endTableFooter) - } - - it should "support empty SCollection of Product" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinProductSCollection - captureOut { - sideEffectWithData(Seq.empty[Product]) { in => - in.closeAndDisplay() - } - } should contain theSameElementsAs DisplayHelpers.sCollectionEmptyMsg.split(newline) - } - - it should "support SCollection of Product over row limit" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinProductSCollection - val o = captureOut { - sideEffectWithData(Seq.fill(21)(TestCaseClass(1, "foo", 2.0D))) { in => - in.closeAndDisplay() - } - } - - o.size should be > testRowLimit - o.head should be(testCaseClassHeader) - o.last should be(rowLimitReached) - } - - // ----------------------------------------------------------------------------------------------- - // Avro SCollection Tests - // ----------------------------------------------------------------------------------------------- - - import scala.collection.JavaConverters._ - - private val schema = { - def f(name: String, tpe: Schema.Type) = - new Schema.Field( - name, - Schema.createUnion(List(Schema.create(Schema.Type.NULL), Schema.create(tpe)).asJava), - null, null) - - val s = Schema.createRecord("GenericAccountRecord", null, null, false) - s.setFields(List( - f("id", Schema.Type.INT), - f("amount", Schema.Type.DOUBLE), - f("name", Schema.Type.STRING), - f("type", Schema.Type.STRING) - ).asJava) - s - } - - private def getTestGenericAvro(i: Int): GenericRecord = { - val s: Schema = new Parser().parse(schema.toString) - val r = new GenericData.Record(s) - r.put("id", i) - r.put("amount", i.toDouble) - r.put("name", "user" + i) - r.put("type", "checking") - r - } - - private def getTestAccountAvro(): Account = { - Account.newBuilder() - .setId(2) - .setAmount(2.0D) - .setName("user2") - .setType("checking") - .build() - } - - private val avroGenericRecordHeader = s"$table id${tab}amount${tab}name${tab}type" - private val avroAccountHeader = s"$table id${tab}type${tab}name${tab}amount" - - it should "support SCollection of GenericRecord" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinAvroSCollection - val o = captureOut { - sideEffectWithData(Seq.fill(3)(getTestGenericAvro(1))) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs (Seq(avroGenericRecordHeader) ++ - Seq.fill(3)(s"1${tab}1.0${tab}user1${tab}checking") ++ endTableSeq) - o.head should be(avroGenericRecordHeader) - o.last should be (endTableFooter) - } - - it should "support SCollection of SpecificRecord Avro" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinAvroSCollection - - val o = captureOut { - sideEffectWithData(Seq.fill(3)(getTestAccountAvro())) { in => - in.closeAndDisplay() - } - } - o should contain theSameElementsAs (Seq(avroAccountHeader) ++ - Seq.fill(3)(s"2${tab}checking${tab}user2${tab}2.0") ++ endTableSeq) - o.head should be(avroAccountHeader) - o.last should be (endTableFooter) - } - - it should "support empty SCollection of SpecificRecord Avro" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinAvroSCollection - captureOut { - sideEffectWithData(Seq.empty[Account]) { in => - in.closeAndDisplay() - } - } should contain theSameElementsAs DisplayHelpers.sCollectionEmptyMsg.split(newline) - } - - it should "support empty SCollection of GenericRecord Avro" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinAvroSCollection - captureOut { - sideEffectWithData(Seq.empty[GenericRecord]) { in => - in.closeAndDisplay() - } - } should contain theSameElementsAs DisplayHelpers.sCollectionEmptyMsg.split(newline) - } - - it should "support SCollection of GenericRecord Avro over row limit" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinAvroSCollection - val o = captureOut { - sideEffectWithData(Seq.fill(21)(getTestGenericAvro(1))) { in => - in.closeAndDisplay() - } - } - - o.size should be > testRowLimit - o.head should be(avroGenericRecordHeader) - o.last should be(rowLimitReached) - } - - it should "support SCollection of SpecificRecord Avro over row limit" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinAvroSCollection - val o = captureOut { - sideEffectWithData(Seq.fill(21)(getTestAccountAvro())) { in => - in.closeAndDisplay() - } - } - - o.size should be > testRowLimit - o.head should be(avroAccountHeader) - o.last should be(rowLimitReached) - } - - // ----------------------------------------------------------------------------------------------- - // TableRow SCollection Tests - // ----------------------------------------------------------------------------------------------- - - private val bQSchema = new TableSchema().setFields(List( - new TableFieldSchema().setName("id").setType("INTEGER"), - new TableFieldSchema().setName("amount").setType("FLOAT"), - new TableFieldSchema().setName("type").setType("STRING"), - new TableFieldSchema().setName("name").setType("STRING") - ).asJava) - - private val bQHeader = s"$table id${tab}amount${tab}type${tab}name" - - private def getBQTableRow(): TableRow = { - TableRow("id" -> 3, "amount" -> 3.0D, "type" -> "checking", "name" -> "user3") - } - - it should "support SCollection of TableRow" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinBQTableSCollection - val o = captureOut { - sideEffectWithData(Seq.fill(3)(getBQTableRow())) { in => - in.closeAndDisplay(bQSchema) - } - } - o should contain theSameElementsAs (Seq(bQHeader) ++ - Seq.fill(3)(s"3${tab}3.0${tab}checking${tab}user3") ++ endTableSeq) - o.head should be(bQHeader) - o.last should be (endTableFooter) - } - - it should "print error on empty BQ schema" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinBQTableSCollection - captureOut { - sideEffectWithData(Seq.fill(3)(getBQTableRow())) { in => - in.closeAndDisplay(new TableSchema()) - } - } should contain theSameElementsAs DisplayHelpers.bQSchemaIncomplete.split(newline) - } - - it should "support SCollection of TableRow over row limit" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinBQTableSCollection - val o = captureOut { - sideEffectWithData(Seq.fill(21)(getBQTableRow())) { in => - in.closeAndDisplay(bQSchema) - } - } - - o.size should be > testRowLimit - o.head should be(bQHeader) - o.last should be(rowLimitReached) - } - - it should "support empty SCollection of TableRow" in { - import org.apache.zeppelin.scio.DisplaySCollectionImplicits.ZeppelinBQTableSCollection - captureOut { - sideEffectWithData(Seq.empty[TableRow]) { in => - in.closeAndDisplay(new TableSchema()) - } - } should contain theSameElementsAs DisplayHelpers.sCollectionEmptyMsg.split(newline) - } - -} diff --git a/scio/src/test/scala/org/apache/zeppelin/scio/TestCC.scala b/scio/src/test/scala/org/apache/zeppelin/scio/TestCC.scala deleted file mode 100644 index 8928b9900b..0000000000 --- a/scio/src/test/scala/org/apache/zeppelin/scio/TestCC.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scio - -case class TestCaseClass(foo: Int, bar: String, a: Double) - -case class CC22(a1: Int, a2: Int, a3: Int, a4: Int, a5: Int, a6: Int, a7: Int, a8: Int, a9: Int, - a10: Int, a11: Int, a12: Int, a13: Int, a14: Int, a15: Int, a16: Int, a17: Int, - a18: Int, a19: Int, a20: Int, a21: Int, a22: Int) \ No newline at end of file diff --git a/scio/src/test/scala/org/apache/zeppelin/scio/util/TestUtils.scala b/scio/src/test/scala/org/apache/zeppelin/scio/util/TestUtils.scala deleted file mode 100644 index 72271b8df7..0000000000 --- a/scio/src/test/scala/org/apache/zeppelin/scio/util/TestUtils.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scio.util - -import java.io.{ByteArrayOutputStream, PrintStream} - -import com.google.common.base.Charsets -import com.spotify.scio.ScioContext -import com.spotify.scio.values.SCollection -import org.apache.zeppelin.scio.DisplayHelpers - -import scala.reflect.ClassTag - -object TestUtils { - val tab = DisplayHelpers.tab - val newline = DisplayHelpers.newline - val table = DisplayHelpers.table - val rowLimitReached = DisplayHelpers.rowLimitReachedMsg.replaceAll(newline,"") - - private[scio] def sideEffectWithData[T: ClassTag](data: Iterable[T]) - (fn: SCollection[T] => Unit): Unit = { - val sc = ScioContext() - fn(sc.parallelize(data)) - if (!sc.isClosed) sc.close() - } - - private[scio] def captureOut[T](body: => T): Seq[String] = { - val bytes = new ByteArrayOutputStream() - val stream = new PrintStream(bytes) - Console.withOut(stream) { body } - bytes.toString(Charsets.UTF_8.toString).split(DisplayHelpers.newline) - } - - -} diff --git a/zeppelin-distribution/src/bin_license/LICENSE b/zeppelin-distribution/src/bin_license/LICENSE index a80dc4f4ea..614c6652e9 100644 --- a/zeppelin-distribution/src/bin_license/LICENSE +++ b/zeppelin-distribution/src/bin_license/LICENSE @@ -175,11 +175,6 @@ The following components are provided under Apache License. (Apache 2.0) tez-yarn-timeline-history-with-acls (org.apache.tez:tez-yarn-timeline-history-with-acls:0.7.0 - http://tez.apache.org) (Apache 2.0) jna (net.java.dev.jna:jna:4.1.0 https://github.com/java-native-access/jna) (Apache 2.0) MathJax v2.7.0 - https://github.com/mathjax/MathJax/blob/2.7.0/LICENSE - (Apache 2.0) Scio REPL 0.2.4 (com.spotify:scio-repl:0.2.4 - https://github.com/spotify/scio) - (Apache 2.0) Scio BigQuery 0.2.4 (com.spotify:scio-bigquery:0.2.4 - https://github.com/spotify/scio) - (Apache 2.0) Scio Core 0.2.4 (com.spotify:scio-core:0.2.4 - https://github.com/spotify/scio) - (Apache 2.0) Scio Extra 0.2.4 (com.spotify:scio-extra:0.2.4 - https://github.com/spotify/scio) - (Apache 2.0) Scio Test 0.2.4 (com.spotify:scio-test:0.2.4 - https://github.com/spotify/scio) (Apache 2.0) Netty Http2 Codec 4.1.0.CR1 (io.netty:netty-codec-http2:4.1.0.CR1 - https://github.com/netty/netty) (Apache 2.0) Netty Http Codec 4.1.0.CR1 (io.netty:netty-codec-http:4.1.0.CR1 - https://github.com/netty/netty) (Apache 2.0) Netty Handler 4.1.0.CR1 (io.netty:netty-handler:4.1.0.CR1 - https://github.com/netty/netty)