This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 11d82c4bda [ZEPPELIN-5764] Remove Scalding interpreter (#4405) 11d82c4bda is described below commit 11d82c4bda72abe276f8c4309f617569c3f4a57c Author: 김민수 <alstnals...@gmail.com> AuthorDate: Fri Jul 15 13:34:12 2022 +0900 [ZEPPELIN-5764] Remove Scalding interpreter (#4405) * [ZEPPELIN-5764] Remove Scalding interpreter * [ZEPPELIN-5764] remove scalding interpreter docs in how_to_build.md remove scalding interpreter command example, scalding keword --- .github/workflows/core.yml | 2 +- conf/interpreter-list | 1 - dev/create_release.sh | 2 +- docs/_includes/themes/zeppelin/_navigation.html | 1 - docs/index.md | 1 - docs/interpreter/scalding.md | 168 ------------- docs/setup/basics/how_to_build.md | 8 +- docs/usage/interpreter/installation.md | 16 -- pom.xml | 1 - scalding/pom.xml | 197 --------------- .../zeppelin/scalding/ScaldingInterpreter.java | 280 --------------------- .../src/main/resources/interpreter-setting.json | 21 -- .../zeppelin/scalding/ZeppelinReplState.scala | 48 ---- .../zeppelin/scalding/ZeppelinScaldingLoop.scala | 46 ---- .../zeppelin/scalding/ZeppelinScaldingShell.scala | 72 ------ .../zeppelin/scalding/ScaldingInterpreterTest.java | 144 ----------- 16 files changed, 3 insertions(+), 1005 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 4b3e42a369..db075e1bc2 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -82,7 +82,7 @@ jobs: interpreter-test-non-core: runs-on: ubuntu-20.04 env: - INTERPRETERS: 'beam,hbase,pig,jdbc,file,flink-cmd,ignite,cassandra,elasticsearch,bigquery,alluxio,scio,livy,groovy,sap,java,geode,neo4j,hazelcastjet,submarine,sparql,mongodb,influxdb,ksql,scalding' + INTERPRETERS: 'beam,hbase,pig,jdbc,file,flink-cmd,ignite,cassandra,elasticsearch,bigquery,alluxio,scio,livy,groovy,sap,java,geode,neo4j,hazelcastjet,submarine,sparql,mongodb,influxdb,ksql' steps: - name: Checkout uses: actions/checkout@v2 diff --git a/conf/interpreter-list b/conf/interpreter-list index 76584969c0..270d243b5c 100644 --- a/conf/interpreter-list +++ b/conf/interpreter-list @@ -39,7 +39,6 @@ neo4j org.apache.zeppelin:zeppelin-neo4j:0.10.0 Neo4j i pig org.apache.zeppelin:zeppelin-pig:0.10.0 Pig interpreter python org.apache.zeppelin:zeppelin-python:0.10.0 Python interpreter sap org.apache.zeppelin:zeppelin-sap:0.10.0 SAP Support -scalding org.apache.zeppelin:zeppelin-scalding_2.0.10:0.10.0 Scalding interpreter scio org.apache.zeppelin:zeppelin-scio:0.10.0 Scio interpreter shell org.apache.zeppelin:zeppelin-shell:0.10.0 Shell command sparql org.apache.zeppelin:zeppelin-sparql:0.10.0 Sparql interpreter diff --git a/dev/create_release.sh b/dev/create_release.sh index a3bef0d1d5..ae2162aa9b 100755 --- a/dev/create_release.sh +++ b/dev/create_release.sh @@ -97,7 +97,7 @@ function make_binary_release() { git_clone make_source_package -make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl !beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet,!submarine,!sparql,!mongodb,!ksql,!scalding -am" +make_binary_release netinst "-Pweb-angular -Phadoop-2.6 -pl !beam,!hbase,!pig,!jdbc,!file,!flink,!ignite,!cassandra,!elasticsearch,!bigquery,!alluxio,!scio,!livy,!groovy,!sap,!java,!geode,!neo4j,!hazelcastjet,!submarine,!sparql,!mongodb,!ksql -am" make_binary_release all "-Pweb-angular -Phadoop-2.6" # remove non release files and dirs diff --git a/docs/_includes/themes/zeppelin/_navigation.html b/docs/_includes/themes/zeppelin/_navigation.html index 205e8fc7fc..ceed569605 100644 --- a/docs/_includes/themes/zeppelin/_navigation.html +++ b/docs/_includes/themes/zeppelin/_navigation.html @@ -163,7 +163,6 @@ <li><a href="{{BASE_PATH}}/interpreter/pig.html">Pig</a></li> <li><a href="{{BASE_PATH}}/interpreter/postgresql.html">Postgresql, HAWQ</a></li> <li><a href="{{BASE_PATH}}/interpreter/sap.html">SAP</a></li> - <li><a href="{{BASE_PATH}}/interpreter/scalding.html">Scalding</a></li> <li><a href="{{BASE_PATH}}/interpreter/scio.html">Scio</a></li> <li><a href="{{BASE_PATH}}/interpreter/shell.html">Shell</a></li> <li><a href="{{BASE_PATH}}/interpreter/sparql.html">Sparql</a></li> diff --git a/docs/index.md b/docs/index.md index d955496160..d3e5a461d5 100644 --- a/docs/index.md +++ b/docs/index.md @@ -163,7 +163,6 @@ limitations under the License. * [Python](./interpreter/python.html) * [R](./interpreter/r.html) * [SAP](./interpreter/sap.html) - * [Scalding](./interpreter/scalding.html) * [Scio](./interpreter/scio.html) * [Shell](./interpreter/shell.html) * [Spark](./interpreter/spark.html) diff --git a/docs/interpreter/scalding.md b/docs/interpreter/scalding.md deleted file mode 100644 index 1d55e59a38..0000000000 --- a/docs/interpreter/scalding.md +++ /dev/null @@ -1,168 +0,0 @@ ---- -layout: page -title: "Scalding Interpreter for Apache Zeppelin" -description: "Scalding is an open source Scala library for writing MapReduce jobs." -group: interpreter ---- -<!-- -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> -{% include JB/setup %} - -# Scalding Interpreter for Apache Zeppelin - -<div id="toc"></div> - -[Scalding](https://github.com/twitter/scalding) is an open source Scala library for writing MapReduce jobs. - -## Building the Scalding Interpreter -You have to first build the Scalding interpreter by enable the **scalding** profile as follows: - -```bash -./mvnw clean package -Pscalding -DskipTests -``` - -## Enabling the Scalding Interpreter -In a notebook, to enable the **Scalding** interpreter, click on the **Gear** icon,select **Scalding**, and hit **Save**. - -<center> - - - - - -</center> - -## Configuring the Interpreter - -Scalding interpreter runs in two modes: - -* local -* hdfs - -In the local mode, you can access files on the local server and scalding transformation are done locally. - -In hdfs mode you can access files in HDFS and scalding transformation are run as hadoop map-reduce jobs. - -Zeppelin comes with a pre-configured Scalding interpreter in local mode. - -To run the scalding interpreter in the hdfs mode you have to do the following: - -**Set the classpath with ZEPPELIN\_CLASSPATH\_OVERRIDES** - -In conf/zeppelin_env.sh, you have to set -ZEPPELIN_CLASSPATH_OVERRIDES to the contents of 'hadoop classpath' -and directories with custom jar files you need for your scalding commands. - -**Set arguments to the scalding repl** - -The default arguments are: `--local --repl` - -For hdfs mode you need to add: `--hdfs --repl` - -If you want to add custom jars, you need to add: `-libjars directory/*:directory/*` - -For reducer estimation, you need to add something like: -`-Dscalding.reducer.estimator.classes=com.twitter.scalding.reducer_estimation.InputSizeReducerEstimator` - -**Set max.open.instances** - -If you want to control the maximum number of open interpreters, you have to select "scoped" interpreter for note -option and set `max.open.instances` argument. - -## Testing the Interpreter - -### Local mode - -In example, by using the [Alice in Wonderland](https://gist.github.com/johnynek/a47699caa62f4f38a3e2) tutorial, -we will count words (of course!), and plot a graph of the top 10 words in the book. - -```scala -%scalding - -import scala.io.Source - -// Get the Alice in Wonderland book from gutenberg.org: -val alice = Source.fromURL("http://www.gutenberg.org/files/11/11.txt").getLines -val aliceLineNum = alice.zipWithIndex.toList -val alicePipe = TypedPipe.from(aliceLineNum) - -// Now get a list of words for the book: -val aliceWords = alicePipe.flatMap { case (text, _) => text.split("\\s+").toList } - -// Now lets add a count for each word: -val aliceWithCount = aliceWords.filterNot(_.equals("")).map { word => (word, 1L) } - -// let's sum them for each word: -val wordCount = aliceWithCount.group.sum - -print ("Here are the top 10 words\n") -val top10 = wordCount - .groupAll - .sortBy { case (word, count) => -count } - .take(10) -top10.dump - -``` -``` -%scalding - -val table = "words\t count\n" + top10.toIterator.map{case (k, (word, count)) => s"$word\t$count"}.mkString("\n") -print("%table " + table) - -``` - -If you click on the icon for the pie chart, you should be able to see a chart like this: - - - -### HDFS mode - -**Test mode** - -``` -%scalding -mode -``` -This command should print: - -``` -res4: com.twitter.scalding.Mode = Hdfs(true,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml) -``` - - -**Test HDFS read** - -```scala -val testfile = TypedPipe.from(TextLine("/user/x/testfile")) -testfile.dump -``` - -This command should print the contents of the hdfs file /user/x/testfile. - -**Test map-reduce job** - -```scala -val testfile = TypedPipe.from(TextLine("/user/x/testfile")) -val a = testfile.groupAll.size.values -a.toList - -``` - -This command should create a map reduce job. - -## Future Work -* Better user feedback (hadoop url, progress updates) -* Ability to cancel jobs -* Ability to dynamically load jars without restarting the interpreter -* Multiuser scalability (run scalding interpreters on different servers) diff --git a/docs/setup/basics/how_to_build.md b/docs/setup/basics/how_to_build.md index 56715a2fec..df5620af29 100644 --- a/docs/setup/basics/how_to_build.md +++ b/docs/setup/basics/how_to_build.md @@ -82,7 +82,7 @@ You can directly start Zeppelin by running the following command after successfu #### Scala profile -To be noticed, this scala profile affect the modules (e.g. cassandra, scalding) that use scala except Spark interpreter (Spark interpreter use other profiles to control its scala version, see the doc below). +To be noticed, this scala profile affect the modules (e.g. cassandra) that use scala except Spark interpreter (Spark interpreter use other profiles to control its scala version, see the doc below). Set scala version (default 2.10). Available profiles are @@ -170,12 +170,6 @@ Ignite Interpreter ./mvnw clean package -Dignite.version=1.9.0 -DskipTests ``` -Scalding Interpreter - -```bash -./mvnw clean package -Pscalding -DskipTests -``` - ### Optional configurations Here are additional configurations that could be optionally tuned using the trailing `-D` option for maven commands diff --git a/docs/usage/interpreter/installation.md b/docs/usage/interpreter/installation.md index 2e26b19896..aaea7b8ebc 100644 --- a/docs/usage/interpreter/installation.md +++ b/docs/usage/interpreter/installation.md @@ -61,19 +61,8 @@ Zeppelin support both Scala 2.10 and 2.11 for several interpreters as below: <td>org.apache.zeppelin:zeppelin-spark_2.10:0.10.0</td> <td>org.apache.zeppelin:zeppelin-spark_2.11:0.10.0</td> </tr> - <tr> - <td>scalding</td> - <td>org.apache.zeppelin:zeppelin-scalding_2.10:0.10.0</td> - <td>org.apache.zeppelin:zeppelin-scalding_2.11:0.10.0</td> - </tr> </table> -If you install one of these interpreters only with `--name` option, installer will download interpreter built with Scala 2.11 by default. If you want to specify Scala version, you will need to add `--artifact` option. Here is the example of installing flink interpreter built with Scala 2.10. - -```bash -./bin/install-interpreter.sh --name flink --artifact org.apache.zeppelin:zeppelin-scalding_2.10:0.10.0 -``` - #### Install Spark interpreter built with Scala 2.10 Spark distribution package has been built with Scala 2.10 until 1.6.2. If you have `SPARK_HOME` set pointing to Spark version earlier than 2.0.0, you need to download Spark interpreter packaged with Scala 2.10. To do so, use follow command: @@ -223,11 +212,6 @@ You can also find the below community managed interpreter list in `conf/interpre <td>org.apache.zeppelin:zeppelin-sap:0.10.0</td> <td>SAP support</td> </tr> - <tr> - <td>scalding</td> - <td>org.apache.zeppelin:zeppelin-scalding_2.0.10:0.10.0</td> - <td>Scalding interpreter</td> - </tr> <tr> <td>scio</td> <td>org.apache.zeppelin:zeppelin-scio:0.10.0</td> diff --git a/pom.xml b/pom.xml index da5b999307..b4c1814c9a 100644 --- a/pom.xml +++ b/pom.xml @@ -86,7 +86,6 @@ <module>scio</module> <module>neo4j</module> <module>sap</module> - <module>scalding</module> <module>java</module> <module>beam</module> <module>hazelcastjet</module> diff --git a/scalding/pom.xml b/scalding/pom.xml deleted file mode 100644 index 54beb0b9d4..0000000000 --- a/scalding/pom.xml +++ /dev/null @@ -1,197 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>zeppelin-interpreter-parent</artifactId> - <groupId>org.apache.zeppelin</groupId> - <version>0.11.0-SNAPSHOT</version> - <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath> - </parent> - - <artifactId>zeppelin-scalding_2.10</artifactId> - <packaging>jar</packaging> - <name>Zeppelin: Scalding interpreter</name> - - <properties> - <interpreter.name>scalding</interpreter.name> - <!--library versions--> - <hadoop.version>${hadoop2.6.version}</hadoop.version> - <scalding.version>0.16.1-RC1</scalding.version> - - <!--plugin versions--> - <plugin.scala.version>2.15.2</plugin.scala.version> - </properties> - - <repositories> - <repository> - <id>conjars</id> - <name>Concurrent Maven Repo</name> - <url>https://conjars.org/repo</url> - </repository> - - <!-- the twitter repo is unreliable (https://github.com/twitter/hadoop-lzo/issues/148) --> - <repository> - <id>twitter</id> - <name>Twitter Maven Repo</name> - <url>https://maven.twttr.com</url> - </repository> - - <!-- Temporary repo --> - <repository> - <id>zeppelin-dependencies</id> - <name>bintray</name> - <url>https://jetbrains.bintray.com/zeppelin-dependencies</url> - </repository> - </repositories> - - <dependencies> - - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-exec</artifactId> - <version>${commons.exec.version}</version> - </dependency> - - <dependency> - <groupId>com.twitter</groupId> - <artifactId>scalding-core_${scala.binary.version}</artifactId> - <version>${scalding.version}</version> - </dependency> - - <dependency> - <groupId>com.twitter</groupId> - <artifactId>scalding-args_${scala.binary.version}</artifactId> - <version>${scalding.version}</version> - </dependency> - - <dependency> - <groupId>com.twitter</groupId> - <artifactId>scalding-date_${scala.binary.version}</artifactId> - <version>${scalding.version}</version> - </dependency> - - <dependency> - <groupId>com.twitter</groupId> - <artifactId>scalding-commons_${scala.binary.version}</artifactId> - <version>${scalding.version}</version> - </dependency> - - <dependency> - <groupId>com.twitter</groupId> - <artifactId>scalding-avro_${scala.binary.version}</artifactId> - <version>${scalding.version}</version> - </dependency> - - <dependency> - <groupId>com.twitter</groupId> - <artifactId>scalding-parquet_${scala.binary.version}</artifactId> - <version>${scalding.version}</version> - </dependency> - - <dependency> - <groupId>com.twitter</groupId> - <artifactId>scalding-repl_${scala.binary.version}</artifactId> - <version>${scalding.version}</version> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - <version>${scala.version}</version> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - <version>${scala.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-enforcer-plugin</artifactId> - </plugin> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - </plugin> - <plugin> - <artifactId>maven-shade-plugin</artifactId> - </plugin> - <!-- Plugin to compile Scala code --> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <executions> - <execution> - <id>compile</id> - <goals> - <goal>compile</goal> - </goals> - <phase>compile</phase> - </execution> - <execution> - <id>test-compile</id> - <goals> - <goal>testCompile</goal> - </goals> - <phase>test-compile</phase> - </execution> - <execution> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <configuration> - <skip>false</skip> - </configuration> - </plugin> - </plugins> - </build> - -</project> diff --git a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java b/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java deleted file mode 100644 index f104a587bb..0000000000 --- a/scalding/src/main/java/org/apache/zeppelin/scalding/ScaldingInterpreter.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scalding; - -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.io.PrintWriter; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Properties; - -import com.twitter.scalding.ScaldingILoop; - -import scala.Console; - -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; -import org.apache.zeppelin.scheduler.Scheduler; -import org.apache.zeppelin.scheduler.SchedulerFactory; - -/** - * Scalding interpreter for Zeppelin. Based off the Spark interpreter code. - * - */ -public class ScaldingInterpreter extends Interpreter { - public static final Logger LOGGER = LoggerFactory.getLogger(ScaldingInterpreter.class); - - static final String ARGS_STRING = "args.string"; - static final String ARGS_STRING_DEFAULT = "--local --repl"; - static final String MAX_OPEN_INSTANCES = "max.open.instances"; - static final String MAX_OPEN_INSTANCES_DEFAULT = "50"; - - public static final List NO_COMPLETION = Collections.unmodifiableList(new ArrayList<>()); - - static int numOpenInstances = 0; - private ScaldingILoop interpreter; - private ByteArrayOutputStream out; - - public ScaldingInterpreter(Properties property) { - super(property); - out = new ByteArrayOutputStream(); - } - - @Override - public void open() { - numOpenInstances = numOpenInstances + 1; - String maxOpenInstancesStr = getProperty(MAX_OPEN_INSTANCES, - MAX_OPEN_INSTANCES_DEFAULT); - int maxOpenInstances = 50; - try { - maxOpenInstances = Integer.valueOf(maxOpenInstancesStr); - } catch (Exception e) { - LOGGER.error("Error reading max.open.instances", e); - } - LOGGER.info("max.open.instances = {}", maxOpenInstances); - if (numOpenInstances > maxOpenInstances) { - LOGGER.error("Reached maximum number of open instances"); - return; - } - LOGGER.info("Opening instance {}", numOpenInstances); - LOGGER.info("property: {}", getProperties()); - String argsString = getProperty(ARGS_STRING, ARGS_STRING_DEFAULT); - String[] args; - if (argsString == null) { - args = new String[0]; - } else { - args = argsString.split(" "); - } - LOGGER.info("{}", Arrays.toString(args)); - - PrintWriter printWriter = new PrintWriter(out, true); - interpreter = ZeppelinScaldingShell.getRepl(args, printWriter); - interpreter.createInterpreter(); - } - - @Override - public void close() { - interpreter.intp().close(); - } - - - @Override - public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { - String user = contextInterpreter.getAuthenticationInfo().getUser(); - LOGGER.info("Running Scalding command: user: {} cmd: '{}'", user, cmd); - - if (interpreter == null) { - LOGGER.error( - "interpreter == null, open may not have been called because max.open.instances reached"); - return new InterpreterResult(Code.ERROR, - "interpreter == null\n" + - "open may not have been called because max.open.instances reached" - ); - } - if (cmd == null || cmd.trim().length() == 0) { - return new InterpreterResult(Code.SUCCESS); - } - InterpreterResult interpreterResult = new InterpreterResult(Code.ERROR); - if (getProperty(ARGS_STRING).contains("hdfs")) { - UserGroupInformation ugi = null; - try { - ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser()); - } catch (IOException e) { - LOGGER.error("Error creating UserGroupInformation", e); - return new InterpreterResult(Code.ERROR, e.getMessage()); - } - try { - // Make variables final to avoid "local variable is accessed from within inner class; - // needs to be declared final" exception in JDK7 - final String cmd1 = cmd; - final InterpreterContext contextInterpreter1 = contextInterpreter; - PrivilegedExceptionAction<InterpreterResult> action = - new PrivilegedExceptionAction<InterpreterResult>() { - @Override - public InterpreterResult run() throws Exception { - return interpret(cmd1.split("\n"), contextInterpreter1); - } - }; - interpreterResult = ugi.doAs(action); - } catch (Exception e) { - LOGGER.error("Error running command with ugi.doAs", e); - return new InterpreterResult(Code.ERROR, e.getMessage()); - } - } else { - interpreterResult = interpret(cmd.split("\n"), contextInterpreter); - } - return interpreterResult; - } - - public InterpreterResult interpret(String[] lines, InterpreterContext context) { - synchronized (this) { - InterpreterResult r = interpretInput(lines); - return r; - } - } - - public InterpreterResult interpretInput(String[] lines) { - - // add print("") to make sure not finishing with comment - // see https://github.com/NFLabs/zeppelin/issues/151 - String[] linesToRun = new String[lines.length + 1]; - for (int i = 0; i < lines.length; i++) { - linesToRun[i] = lines[i]; - } - linesToRun[lines.length] = "print(\"\")"; - - out.reset(); - - // Moving two lines below from open() to this function. - // If they are in open output is incomplete. - PrintStream printStream = new PrintStream(out, true); - Console.setOut(printStream); - - Code r = null; - String incomplete = ""; - boolean inComment = false; - - for (int l = 0; l < linesToRun.length; l++) { - String s = linesToRun[l]; - // check if next line starts with "." (but not ".." or "./") it is treated as an invocation - if (l + 1 < linesToRun.length) { - String nextLine = linesToRun[l + 1].trim(); - boolean continuation = false; - if (nextLine.isEmpty() - || nextLine.startsWith("//") // skip empty line or comment - || nextLine.startsWith("}") - || nextLine.startsWith("object")) { // include "} object" for Scala companion object - continuation = true; - } else if (!inComment && nextLine.startsWith("/*")) { - inComment = true; - continuation = true; - } else if (inComment && nextLine.lastIndexOf("*/") >= 0) { - inComment = false; - continuation = true; - } else if (nextLine.length() > 1 - && nextLine.charAt(0) == '.' - && nextLine.charAt(1) != '.' // ".." - && nextLine.charAt(1) != '/') { // "./" - continuation = true; - } else if (inComment) { - continuation = true; - } - if (continuation) { - incomplete += s + "\n"; - continue; - } - } - - scala.tools.nsc.interpreter.Results.Result res = null; - try { - res = interpreter.intp().interpret(incomplete + s); - } catch (Exception e) { - LOGGER.error("Interpreter exception: ", e); - return new InterpreterResult(Code.ERROR, e.getMessage()); - } - - r = getResultCode(res); - - if (r == Code.ERROR) { - Console.flush(); - return new InterpreterResult(r, out.toString()); - } else if (r == Code.INCOMPLETE) { - incomplete += s + "\n"; - } else { - incomplete = ""; - } - } - if (r == Code.INCOMPLETE) { - return new InterpreterResult(r, "Incomplete expression"); - } else { - Console.flush(); - return new InterpreterResult(r, out.toString()); - } - } - - private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) { - if (r instanceof scala.tools.nsc.interpreter.Results.Success$) { - return Code.SUCCESS; - } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) { - return Code.INCOMPLETE; - } else { - return Code.ERROR; - } - } - - @Override - public void cancel(InterpreterContext context) { - // not implemented - } - - @Override - public FormType getFormType() { - return FormType.NATIVE; - } - - @Override - public int getProgress(InterpreterContext context) { - // fine-grained progress not implemented - return 0 - return 0; - } - - @Override - public Scheduler getScheduler() { - return SchedulerFactory.singleton().createOrGetFIFOScheduler( - ScaldingInterpreter.class.getName() + this.hashCode()); - } - - @Override - public List<InterpreterCompletion> completion(String buf, int cursor, - InterpreterContext interpreterContext) { - return NO_COMPLETION; - } - -} diff --git a/scalding/src/main/resources/interpreter-setting.json b/scalding/src/main/resources/interpreter-setting.json deleted file mode 100644 index ca6cd9295a..0000000000 --- a/scalding/src/main/resources/interpreter-setting.json +++ /dev/null @@ -1,21 +0,0 @@ -[ - { - "group": "scalding", - "name": "scalding", - "className": "org.apache.zeppelin.scalding.ScaldingInterpreter", - "properties": { - "args.string": { - "envName": null, - "defaultValue": "--local --repl", - "description": "Arguments for scalding REPL", - "type": "textarea" - }, - "max.open.instances": { - "envName": null, - "defaultValue": "50", - "description": "Maximum number of open interpreter instances", - "type": "number" - } - } - } -] diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala deleted file mode 100644 index b847eba001..0000000000 --- a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinReplState.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scalding - -/** - * Stores REPL state - */ - -import cascading.flow.FlowDef -import com.twitter.scalding.BaseReplState -import scala.concurrent.{ ExecutionContext => ConcurrentExecutionContext } -import scala.concurrent.Future -import scala.util.{Failure, Success} - -object ZeppelinReplState extends BaseReplState { - override def shell = ZeppelinScaldingShell -} - -/** - * Implicit FlowDef and Mode, import in the REPL to have the global context implicitly - * used everywhere. - */ -object ZeppelinReplImplicitContext { - /** Implicit execution context for using the Execution monad */ - implicit val executionContext = ConcurrentExecutionContext.global - /** Implicit repl state used for ShellPipes */ - implicit def stateImpl = ZeppelinReplState - /** Implicit flowDef for this Scalding shell session. */ - implicit def flowDefImpl = ZeppelinReplState.flowDef - /** Defaults to running in local mode if no mode is specified. */ - implicit def modeImpl = ZeppelinReplState.mode - implicit def configImpl = ZeppelinReplState.config -} diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala deleted file mode 100644 index 9be0199869..0000000000 --- a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingLoop.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scalding - -import java.io.BufferedReader -import com.twitter.scalding.ScaldingILoop - -import scala.tools.nsc.interpreter._ - -/** - * TBD - */ -class ZeppelinScaldingILoop(in: Option[BufferedReader], out: JPrintWriter) - extends ScaldingILoop(in, out) { - - override protected def imports = List( - "com.twitter.scalding.{ ScaldingILoop => ScaldingScaldingILoop, ScaldingShell => ScaldingScaldingShell, _ }", - // ReplImplicits minus fields API parts (esp FieldConversions) - """com.twitter.scalding.ReplImplicits.{ - iterableToSource, - keyedListLikeToShellTypedPipe, - typedPipeToShellTypedPipe, - valuePipeToShellValuePipe - }""", - "com.twitter.scalding.ReplImplicits", - "org.apache.zeppelin.scalding.ZeppelinReplImplicitContext._", - "org.apache.zeppelin.scalding.ZeppelinReplState", - "org.apache.zeppelin.scalding.ZeppelinReplState._" - ) - -} diff --git a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala b/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala deleted file mode 100644 index 29e5f835cb..0000000000 --- a/scalding/src/main/scala/org/apache/zeppelin/scalding/ZeppelinScaldingShell.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scalding - -import com.twitter.scalding._ -import com.twitter.scalding.typed.TypedPipe -import scala.tools.nsc.{GenericRunnerCommand} -import scala.tools.nsc.interpreter._ - -/** - * TBD - */ -object ZeppelinScaldingShell extends BaseScaldingShell { - - override def replState = ZeppelinReplState - - def getRepl(args: Array[String], out: JPrintWriter): ScaldingILoop = { - - val argsExpanded = ExpandLibJarsGlobs(args) - val ShellArgs(cfg, mode, cmdArgs) = parseModeArgs(argsExpanded) - - // Process command line arguments into a settings object, and use that to start the REPL. - // We ignore params we don't care about - hence error function is empty - val command = new GenericRunnerCommand(cmdArgs, _ => ()) - - // inherit defaults for embedded interpretter (needed for running with SBT) - // (TypedPipe chosen arbitrarily, just needs to be something representative) - command.settings.embeddedDefaults[TypedPipe[String]] - - // if running from the assembly, need to explicitly tell it to use java classpath - if (args.contains("--repl")) command.settings.usejavacp.value = true - - command.settings.classpath.append(System.getProperty("java.class.path")) - - // Force the repl to be synchronous, so all cmds are executed in the same thread - command.settings.Yreplsync.value = true - - val repl = new ZeppelinScaldingILoop(None, out) - scaldingREPL = Some(repl) - replState.mode = mode - replState.customConfig = replState.customConfig ++ (mode match { - case _: HadoopMode => cfg - case _ => Config.empty - }) - - // if in Hdfs mode, store the mode to enable switching between Local and Hdfs - mode match { - case m @ Hdfs(_, _) => replState.storedHdfsMode = Some(m) - case _ => () - } - - repl.settings = command.settings - return repl; - - } - -} diff --git a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java b/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java deleted file mode 100644 index 992c15594f..0000000000 --- a/scalding/src/test/java/org/apache/zeppelin/scalding/ScaldingInterpreterTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.scalding; - -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.FixMethodOrder; -import org.junit.Test; -import org.junit.runners.MethodSorters; - -import java.io.File; -import java.util.Properties; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Tests for the Scalding interpreter for Zeppelin. - * - */ -@FixMethodOrder(MethodSorters.NAME_ASCENDING) -public class ScaldingInterpreterTest { - public static ScaldingInterpreter repl; - private InterpreterContext context; - private File tmpDir; - - @Before - public void setUp() throws Exception { - tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + - System.currentTimeMillis()); - System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo"); - - tmpDir.mkdirs(); - - if (repl == null) { - Properties p = new Properties(); - p.setProperty(ScaldingInterpreter.ARGS_STRING, "--local --repl"); - - repl = new ScaldingInterpreter(p); - repl.open(); - } - - context = InterpreterContext.builder() - .setNoteId("noteId") - .setParagraphId("paragraphId") - .setAuthenticationInfo(new AuthenticationInfo()) - .build(); - } - - @After - public void tearDown() throws Exception { - delete(tmpDir); - repl.close(); - } - - private void delete(File file) { - if (file.isFile()) { - file.delete(); - } else if (file.isDirectory()) { - File[] files = file.listFiles(); - if (files != null && files.length > 0) { - for (File f : files) { - delete(f); - } - } - file.delete(); - } - } - - @Test - public void testNextLineComments() { - assertEquals(InterpreterResult.Code.SUCCESS, - repl.interpret("\"123\"\n/*comment here\n*/.toInt", context).code()); - } - - @Test - public void testNextLineCompanionObject() { - String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter " + - "{\n def apply(x: Long) = new Counter()\n}"; - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret(code, context).code()); - } - - @Test - public void testBasicIntp() { - assertEquals(InterpreterResult.Code.SUCCESS, - repl.interpret("val a = 1\nval b = 2", context).code()); - - // when interpret incomplete expression - InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context); - assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code()); - assertTrue(incomplete.message().get(0).getData().length() > 0); // expecting some error - // message - } - - @Test - public void testBasicScalding() { - assertEquals(InterpreterResult.Code.SUCCESS, - repl.interpret("case class Sale(state: String, name: String, sale: Int)\n" + - "val salesList = List(Sale(\"CA\", \"A\", 60), Sale(\"CA\", \"A\", 20), " + - "Sale(\"VA\", \"B\", 15))\n" + - "val salesPipe = TypedPipe.from(salesList)\n" + - "val results = salesPipe.map{x => (1, Set(x.state), x.sale)}.\n" + - " groupAll.sum.values.map{ case(count, set, sum) => (count, set.size, sum) }\n" + - "results.dump", - context).code()); - } - - @Test - public void testNextLineInvocation() { - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code()); - } - - @Test - public void testEndWithComment() { - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", - context).code()); - } - - @Test - public void testReferencingUndefinedVal() { - InterpreterResult result = repl.interpret("def category(min: Int) = {" - + " if (0 <= value) \"error\"" + "}", context); - assertEquals(Code.ERROR, result.code()); - } -}