http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/java/org/apache/zeppelin/spark/Utils.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java deleted file mode 100644 index 82bf210..0000000 --- a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.Properties; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Utility and helper functions for the Spark Interpreter - */ -class Utils { - public static Logger logger = LoggerFactory.getLogger(Utils.class); - private static final String SCALA_COMPILER_VERSION = evaluateScalaCompilerVersion(); - - static Object invokeMethod(Object o, String name) { - return invokeMethod(o, name, new Class[]{}, new Object[]{}); - } - - static Object invokeMethod(Object o, String name, Class<?>[] argTypes, Object[] params) { - try { - return o.getClass().getMethod(name, argTypes).invoke(o, params); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - logger.error(e.getMessage(), e); - } - return null; - } - - static Object invokeStaticMethod(Class<?> c, String name, Class<?>[] argTypes, Object[] params) { - try { - return c.getMethod(name, argTypes).invoke(null, params); - } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { - logger.error(e.getMessage(), e); - } - return null; - } - - static Object invokeStaticMethod(Class<?> c, String name) { - return invokeStaticMethod(c, name, new Class[]{}, new Object[]{}); - } - - static Class<?> findClass(String name) { - return findClass(name, false); - } - - static Class<?> findClass(String name, boolean silence) { - try { - return Class.forName(name); - } catch (ClassNotFoundException e) { - if (!silence) { - logger.error(e.getMessage(), e); - } - return null; - } - } - - static Object instantiateClass(String name, Class<?>[] argTypes, Object[] params) { - try { - Constructor<?> constructor = Utils.class.getClassLoader() - .loadClass(name).getConstructor(argTypes); - return constructor.newInstance(params); - } catch (NoSuchMethodException | ClassNotFoundException | IllegalAccessException | - InstantiationException | InvocationTargetException e) { - logger.error(e.getMessage(), e); - } - return null; - } - - // function works after intp is initialized - static boolean isScala2_10() { - try { - Class.forName("org.apache.spark.repl.SparkIMain"); - return true; - } catch (ClassNotFoundException e) { - return false; - } catch (IncompatibleClassChangeError e) { - return false; - } - } - - static boolean isScala2_11() { - return !isScala2_10(); - } - - static boolean isCompilerAboveScala2_11_7() { - if (isScala2_10() || SCALA_COMPILER_VERSION == null) { - return false; - } - Pattern p = Pattern.compile("([0-9]+)[.]([0-9]+)[.]([0-9]+)"); - Matcher m = p.matcher(SCALA_COMPILER_VERSION); - if (m.matches()) { - int major = Integer.parseInt(m.group(1)); - int minor = Integer.parseInt(m.group(2)); - int bugfix = Integer.parseInt(m.group(3)); - return (major > 2 || (major == 2 && minor > 11) || (major == 2 && minor == 11 && bugfix > 7)); - } - return false; - } - - private static String evaluateScalaCompilerVersion() { - String version = null; - try { - Properties p = new Properties(); - Class<?> completionClass = findClass("scala.tools.nsc.interpreter.JLineCompletion"); - if (completionClass != null) { - try (java.io.InputStream in = completionClass.getClass() - .getResourceAsStream("/compiler.properties")) { - p.load(in); - version = p.getProperty("version.number"); - } catch (java.io.IOException e) { - logger.error("Failed to evaluate Scala compiler version", e); - } - } - } catch (RuntimeException e) { - logger.error("Failed to evaluate Scala compiler version", e); - } - return version; - } - - static boolean isSpark2() { - try { - Class.forName("org.apache.spark.sql.SparkSession"); - return true; - } catch (ClassNotFoundException e) { - return false; - } - } - - public static String buildJobGroupId(InterpreterContext context) { - return "zeppelin-" + context.getNoteId() + "-" + context.getParagraphId(); - } - - public static String getNoteId(String jobgroupId) { - int indexOf = jobgroupId.indexOf("-"); - int secondIndex = jobgroupId.indexOf("-", indexOf + 1); - return jobgroupId.substring(indexOf + 1, secondIndex); - } - - public static String getParagraphId(String jobgroupId) { - int indexOf = jobgroupId.indexOf("-"); - int secondIndex = jobgroupId.indexOf("-", indexOf + 1); - return jobgroupId.substring(secondIndex + 1, jobgroupId.length()); - } - - public static String getUserName(AuthenticationInfo info) { - String uName = ""; - if (info != null) { - uName = info.getUser(); - } - if (uName == null || uName.isEmpty()) { - uName = "anonymous"; - } - return uName; - } -}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java deleted file mode 100644 index 130d849..0000000 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.spark; - -import org.apache.commons.exec.*; -import org.apache.commons.exec.environment.EnvironmentUtils; -import org.apache.commons.io.IOUtils; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterOutput; -import org.apache.zeppelin.interpreter.InterpreterOutputListener; -import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; -import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -/** - * R repl interaction - */ -public class ZeppelinR implements ExecuteResultHandler { - Logger logger = LoggerFactory.getLogger(ZeppelinR.class); - private final String rCmdPath; - private final SparkVersion sparkVersion; - private DefaultExecutor executor; - private InterpreterOutputStream outputStream; - private PipedOutputStream input; - private final String scriptPath; - private final String libPath; - static Map<Integer, ZeppelinR> zeppelinR = Collections.synchronizedMap( - new HashMap<Integer, ZeppelinR>()); - - private InterpreterOutput initialOutput; - private final int port; - private boolean rScriptRunning; - - /** - * To be notified R repl initialization - */ - boolean rScriptInitialized = false; - Integer rScriptInitializeNotifier = new Integer(0); - - /** - * Request to R repl - */ - Request rRequestObject = null; - Integer rRequestNotifier = new Integer(0); - - /** - * Request object - * - * type : "eval", "set", "get" - * stmt : statement to evaluate when type is "eval" - * key when type is "set" or "get" - * value : value object when type is "put" - */ - public static class Request { - String type; - String stmt; - Object value; - - public Request(String type, String stmt, Object value) { - this.type = type; - this.stmt = stmt; - this.value = value; - } - - public String getType() { - return type; - } - - public String getStmt() { - return stmt; - } - - public Object getValue() { - return value; - } - } - - /** - * Response from R repl - */ - Object rResponseValue = null; - boolean rResponseError = false; - Integer rResponseNotifier = new Integer(0); - - /** - * Create ZeppelinR instance - * @param rCmdPath R repl commandline path - * @param libPath sparkr library path - */ - public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort, - SparkVersion sparkVersion) { - this.rCmdPath = rCmdPath; - this.libPath = libPath; - this.sparkVersion = sparkVersion; - this.port = sparkRBackendPort; - try { - File scriptFile = File.createTempFile("zeppelin_sparkr-", ".R"); - scriptPath = scriptFile.getAbsolutePath(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * Start R repl - * @throws IOException - */ - public void open() throws IOException, InterpreterException { - createRScript(); - - zeppelinR.put(hashCode(), this); - - CommandLine cmd = CommandLine.parse(rCmdPath); - cmd.addArgument("--no-save"); - cmd.addArgument("--no-restore"); - cmd.addArgument("-f"); - cmd.addArgument(scriptPath); - cmd.addArgument("--args"); - cmd.addArgument(Integer.toString(hashCode())); - cmd.addArgument(Integer.toString(port)); - cmd.addArgument(libPath); - cmd.addArgument(Integer.toString(sparkVersion.toNumber())); - - // dump out the R command to facilitate manually running it, e.g. for fault diagnosis purposes - logger.debug(cmd.toString()); - - executor = new DefaultExecutor(); - outputStream = new InterpreterOutputStream(logger); - - input = new PipedOutputStream(); - PipedInputStream in = new PipedInputStream(input); - - PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in); - executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)); - executor.setStreamHandler(streamHandler); - Map env = EnvironmentUtils.getProcEnvironment(); - - - initialOutput = new InterpreterOutput(null); - outputStream.setInterpreterOutput(initialOutput); - executor.execute(cmd, env, this); - rScriptRunning = true; - - // flush output - eval("cat('')"); - } - - /** - * Evaluate expression - * @param expr - * @return - */ - public Object eval(String expr) throws InterpreterException { - synchronized (this) { - rRequestObject = new Request("eval", expr, null); - return request(); - } - } - - /** - * assign value to key - * @param key - * @param value - */ - public void set(String key, Object value) throws InterpreterException { - synchronized (this) { - rRequestObject = new Request("set", key, value); - request(); - } - } - - /** - * get value of key - * @param key - * @return - */ - public Object get(String key) throws InterpreterException { - synchronized (this) { - rRequestObject = new Request("get", key, null); - return request(); - } - } - - /** - * get value of key, as a string - * @param key - * @return - */ - public String getS0(String key) throws InterpreterException { - synchronized (this) { - rRequestObject = new Request("getS", key, null); - return (String) request(); - } - } - - /** - * Send request to r repl and return response - * @return responseValue - */ - private Object request() throws RuntimeException, InterpreterException { - if (!rScriptRunning) { - throw new RuntimeException("r repl is not running"); - } - - // wait for rscript initialized - if (!rScriptInitialized) { - waitForRScriptInitialized(); - } - - rResponseValue = null; - - synchronized (rRequestNotifier) { - rRequestNotifier.notify(); - } - - Object respValue = null; - synchronized (rResponseNotifier) { - while (rResponseValue == null && rScriptRunning) { - try { - rResponseNotifier.wait(1000); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } - } - respValue = rResponseValue; - rResponseValue = null; - } - - if (rResponseError) { - throw new RuntimeException(respValue.toString()); - } else { - return respValue; - } - } - - /** - * Wait until src/main/resources/R/zeppelin_sparkr.R is initialized - * and call onScriptInitialized() - * - * @throws InterpreterException - */ - private void waitForRScriptInitialized() throws InterpreterException { - synchronized (rScriptInitializeNotifier) { - long startTime = System.nanoTime(); - while (rScriptInitialized == false && - rScriptRunning && - System.nanoTime() - startTime < 10L * 1000 * 1000000) { - try { - rScriptInitializeNotifier.wait(1000); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } - } - } - - String errorMessage = ""; - try { - initialOutput.flush(); - errorMessage = new String(initialOutput.toByteArray()); - } catch (IOException e) { - e.printStackTrace(); - } - - if (rScriptInitialized == false) { - throw new InterpreterException("sparkr is not responding " + errorMessage); - } - } - - /** - * invoked by src/main/resources/R/zeppelin_sparkr.R - * @return - */ - public Request getRequest() { - synchronized (rRequestNotifier) { - while (rRequestObject == null) { - try { - rRequestNotifier.wait(1000); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } - } - - Request req = rRequestObject; - rRequestObject = null; - return req; - } - } - - /** - * invoked by src/main/resources/R/zeppelin_sparkr.R - * @param value - * @param error - */ - public void setResponse(Object value, boolean error) { - synchronized (rResponseNotifier) { - rResponseValue = value; - rResponseError = error; - rResponseNotifier.notify(); - } - } - - /** - * invoked by src/main/resources/R/zeppelin_sparkr.R - */ - public void onScriptInitialized() { - synchronized (rScriptInitializeNotifier) { - rScriptInitialized = true; - rScriptInitializeNotifier.notifyAll(); - } - } - - /** - * Create R script in tmp dir - */ - private void createRScript() throws InterpreterException { - ClassLoader classLoader = getClass().getClassLoader(); - File out = new File(scriptPath); - - if (out.exists() && out.isDirectory()) { - throw new InterpreterException("Can't create r script " + out.getAbsolutePath()); - } - - try { - FileOutputStream outStream = new FileOutputStream(out); - IOUtils.copy( - classLoader.getResourceAsStream("R/zeppelin_sparkr.R"), - outStream); - outStream.close(); - } catch (IOException e) { - throw new InterpreterException(e); - } - - logger.info("File {} created", scriptPath); - } - - /** - * Terminate this R repl - */ - public void close() { - executor.getWatchdog().destroyProcess(); - new File(scriptPath).delete(); - zeppelinR.remove(hashCode()); - } - - /** - * Get instance - * This method will be invoded from zeppelin_sparkr.R - * @param hashcode - * @return - */ - public static ZeppelinR getZeppelinR(int hashcode) { - return zeppelinR.get(hashcode); - } - - /** - * Pass InterpreterOutput to capture the repl output - * @param out - */ - public void setInterpreterOutput(InterpreterOutput out) { - outputStream.setInterpreterOutput(out); - } - - @Override - public void onProcessComplete(int i) { - logger.info("process complete {}", i); - rScriptRunning = false; - } - - @Override - public void onProcessFailed(ExecuteException e) { - logger.error(e.getMessage(), e); - rScriptRunning = false; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java deleted file mode 100644 index 80ea03b..0000000 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SQLContext; - -/** - * Contains the Spark and Zeppelin Contexts made available to SparkR. - */ -public class ZeppelinRContext { - private static SparkContext sparkContext; - private static SQLContext sqlContext; - private static SparkZeppelinContext zeppelinContext; - private static Object sparkSession; - private static JavaSparkContext javaSparkContext; - - public static void setSparkContext(SparkContext sparkContext) { - ZeppelinRContext.sparkContext = sparkContext; - } - - public static void setZeppelinContext(SparkZeppelinContext zeppelinContext) { - ZeppelinRContext.zeppelinContext = zeppelinContext; - } - - public static void setSqlContext(SQLContext sqlContext) { - ZeppelinRContext.sqlContext = sqlContext; - } - - public static void setSparkSession(Object sparkSession) { - ZeppelinRContext.sparkSession = sparkSession; - } - - public static SparkContext getSparkContext() { - return sparkContext; - } - - public static SQLContext getSqlContext() { - return sqlContext; - } - - public static SparkZeppelinContext getZeppelinContext() { - return zeppelinContext; - } - - public static Object getSparkSession() { - return sparkSession; - } - - public static void setJavaSparkContext(JavaSparkContext jsc) { javaSparkContext = jsc; } - - public static JavaSparkContext getJavaSparkContext() { return javaSparkContext; } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java deleted file mode 100644 index 0235fc6..0000000 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyContext.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark.dep; - -import java.io.File; -import java.net.MalformedURLException; -import java.util.LinkedList; -import java.util.List; - -import org.apache.zeppelin.dep.Booter; -import org.apache.zeppelin.dep.Dependency; -import org.apache.zeppelin.dep.Repository; - -import org.sonatype.aether.RepositorySystem; -import org.sonatype.aether.RepositorySystemSession; -import org.sonatype.aether.artifact.Artifact; -import org.sonatype.aether.collection.CollectRequest; -import org.sonatype.aether.graph.DependencyFilter; -import org.sonatype.aether.repository.RemoteRepository; -import org.sonatype.aether.repository.Authentication; -import org.sonatype.aether.resolution.ArtifactResolutionException; -import org.sonatype.aether.resolution.ArtifactResult; -import org.sonatype.aether.resolution.DependencyRequest; -import org.sonatype.aether.resolution.DependencyResolutionException; -import org.sonatype.aether.util.artifact.DefaultArtifact; -import org.sonatype.aether.util.artifact.JavaScopes; -import org.sonatype.aether.util.filter.DependencyFilterUtils; -import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter; - - -/** - * - */ -public class SparkDependencyContext { - List<Dependency> dependencies = new LinkedList<>(); - List<Repository> repositories = new LinkedList<>(); - - List<File> files = new LinkedList<>(); - List<File> filesDist = new LinkedList<>(); - private RepositorySystem system = Booter.newRepositorySystem(); - private RepositorySystemSession session; - private RemoteRepository mavenCentral = Booter.newCentralRepository(); - private RemoteRepository mavenLocal = Booter.newLocalRepository(); - private List<RemoteRepository> additionalRepos = new LinkedList<>(); - - public SparkDependencyContext(String localRepoPath, String additionalRemoteRepository) { - session = Booter.newRepositorySystemSession(system, localRepoPath); - addRepoFromProperty(additionalRemoteRepository); - } - - public Dependency load(String lib) { - Dependency dep = new Dependency(lib); - - if (dependencies.contains(dep)) { - dependencies.remove(dep); - } - dependencies.add(dep); - return dep; - } - - public Repository addRepo(String name) { - Repository rep = new Repository(name); - repositories.add(rep); - return rep; - } - - public void reset() { - dependencies = new LinkedList<>(); - repositories = new LinkedList<>(); - - files = new LinkedList<>(); - filesDist = new LinkedList<>(); - } - - private void addRepoFromProperty(String listOfRepo) { - if (listOfRepo != null) { - String[] repos = listOfRepo.split(";"); - for (String repo : repos) { - String[] parts = repo.split(","); - if (parts.length == 3) { - String id = parts[0].trim(); - String url = parts[1].trim(); - boolean isSnapshot = Boolean.parseBoolean(parts[2].trim()); - if (id.length() > 1 && url.length() > 1) { - RemoteRepository rr = new RemoteRepository(id, "default", url); - rr.setPolicy(isSnapshot, null); - additionalRepos.add(rr); - } - } - } - } - } - - /** - * fetch all artifacts - * @return - * @throws MalformedURLException - * @throws ArtifactResolutionException - * @throws DependencyResolutionException - */ - public List<File> fetch() throws MalformedURLException, - DependencyResolutionException, ArtifactResolutionException { - - for (Dependency dep : dependencies) { - if (!dep.isLocalFsArtifact()) { - List<ArtifactResult> artifacts = fetchArtifactWithDep(dep); - for (ArtifactResult artifact : artifacts) { - if (dep.isDist()) { - filesDist.add(artifact.getArtifact().getFile()); - } - files.add(artifact.getArtifact().getFile()); - } - } else { - if (dep.isDist()) { - filesDist.add(new File(dep.getGroupArtifactVersion())); - } - files.add(new File(dep.getGroupArtifactVersion())); - } - } - - return files; - } - - private List<ArtifactResult> fetchArtifactWithDep(Dependency dep) - throws DependencyResolutionException, ArtifactResolutionException { - Artifact artifact = new DefaultArtifact( - SparkDependencyResolver.inferScalaVersion(dep.getGroupArtifactVersion())); - - DependencyFilter classpathFlter = DependencyFilterUtils - .classpathFilter(JavaScopes.COMPILE); - PatternExclusionsDependencyFilter exclusionFilter = new PatternExclusionsDependencyFilter( - SparkDependencyResolver.inferScalaVersion(dep.getExclusions())); - - CollectRequest collectRequest = new CollectRequest(); - collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact, - JavaScopes.COMPILE)); - - collectRequest.addRepository(mavenCentral); - collectRequest.addRepository(mavenLocal); - for (RemoteRepository repo : additionalRepos) { - collectRequest.addRepository(repo); - } - for (Repository repo : repositories) { - RemoteRepository rr = new RemoteRepository(repo.getId(), "default", repo.getUrl()); - rr.setPolicy(repo.isSnapshot(), null); - Authentication auth = repo.getAuthentication(); - if (auth != null) { - rr.setAuthentication(auth); - } - collectRequest.addRepository(rr); - } - - DependencyRequest dependencyRequest = new DependencyRequest(collectRequest, - DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter)); - - return system.resolveDependencies(session, dependencyRequest).getArtifactResults(); - } - - public List<File> getFiles() { - return files; - } - - public List<File> getFilesDist() { - return filesDist; - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java deleted file mode 100644 index 46224a8..0000000 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/SparkDependencyResolver.java +++ /dev/null @@ -1,351 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark.dep; - -import java.io.File; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.URL; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -import org.apache.commons.lang.StringUtils; -import org.apache.spark.SparkContext; -import org.apache.zeppelin.dep.AbstractDependencyResolver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.sonatype.aether.artifact.Artifact; -import org.sonatype.aether.collection.CollectRequest; -import org.sonatype.aether.graph.Dependency; -import org.sonatype.aether.graph.DependencyFilter; -import org.sonatype.aether.repository.RemoteRepository; -import org.sonatype.aether.resolution.ArtifactResult; -import org.sonatype.aether.resolution.DependencyRequest; -import org.sonatype.aether.util.artifact.DefaultArtifact; -import org.sonatype.aether.util.artifact.JavaScopes; -import org.sonatype.aether.util.filter.DependencyFilterUtils; -import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter; - -import scala.Some; -import scala.collection.IndexedSeq; -import scala.reflect.io.AbstractFile; -import scala.tools.nsc.Global; -import scala.tools.nsc.backend.JavaPlatform; -import scala.tools.nsc.util.ClassPath; -import scala.tools.nsc.util.MergedClassPath; - -/** - * Deps resolver. - * Add new dependencies from mvn repo (at runtime) to Spark interpreter group. - */ -public class SparkDependencyResolver extends AbstractDependencyResolver { - Logger logger = LoggerFactory.getLogger(SparkDependencyResolver.class); - private Global global; - private ClassLoader runtimeClassLoader; - private SparkContext sc; - - private final String[] exclusions = new String[] {"org.scala-lang:scala-library", - "org.scala-lang:scala-compiler", - "org.scala-lang:scala-reflect", - "org.scala-lang:scalap", - "org.apache.zeppelin:zeppelin-zengine", - "org.apache.zeppelin:zeppelin-spark", - "org.apache.zeppelin:zeppelin-server"}; - - public SparkDependencyResolver(Global global, - ClassLoader runtimeClassLoader, - SparkContext sc, - String localRepoPath, - String additionalRemoteRepository) { - super(localRepoPath); - this.global = global; - this.runtimeClassLoader = runtimeClassLoader; - this.sc = sc; - addRepoFromProperty(additionalRemoteRepository); - } - - private void addRepoFromProperty(String listOfRepo) { - if (listOfRepo != null) { - String[] repos = listOfRepo.split(";"); - for (String repo : repos) { - String[] parts = repo.split(","); - if (parts.length == 3) { - String id = parts[0].trim(); - String url = parts[1].trim(); - boolean isSnapshot = Boolean.parseBoolean(parts[2].trim()); - if (id.length() > 1 && url.length() > 1) { - addRepo(id, url, isSnapshot); - } - } - } - } - } - - private void updateCompilerClassPath(URL[] urls) throws IllegalAccessException, - IllegalArgumentException, InvocationTargetException { - - JavaPlatform platform = (JavaPlatform) global.platform(); - MergedClassPath<AbstractFile> newClassPath = mergeUrlsIntoClassPath(platform, urls); - - Method[] methods = platform.getClass().getMethods(); - for (Method m : methods) { - if (m.getName().endsWith("currentClassPath_$eq")) { - m.invoke(platform, new Some(newClassPath)); - break; - } - } - - // NOTE: Must use reflection until this is exposed/fixed upstream in Scala - List<String> classPaths = new LinkedList<>(); - for (URL url : urls) { - classPaths.add(url.getPath()); - } - - // Reload all jars specified into our compiler - global.invalidateClassPathEntries(scala.collection.JavaConversions.asScalaBuffer(classPaths) - .toList()); - } - - // Until spark 1.1.x - // check https://github.com/apache/spark/commit/191d7cf2a655d032f160b9fa181730364681d0e7 - private void updateRuntimeClassPath_1_x(URL[] urls) throws SecurityException, - IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchMethodException { - Method addURL; - addURL = runtimeClassLoader.getClass().getDeclaredMethod("addURL", new Class[] {URL.class}); - addURL.setAccessible(true); - for (URL url : urls) { - addURL.invoke(runtimeClassLoader, url); - } - } - - private void updateRuntimeClassPath_2_x(URL[] urls) throws SecurityException, - IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchMethodException { - Method addURL; - addURL = runtimeClassLoader.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class}); - addURL.setAccessible(true); - for (URL url : urls) { - addURL.invoke(runtimeClassLoader, url); - } - } - - private MergedClassPath<AbstractFile> mergeUrlsIntoClassPath(JavaPlatform platform, URL[] urls) { - IndexedSeq<ClassPath<AbstractFile>> entries = - ((MergedClassPath<AbstractFile>) platform.classPath()).entries(); - List<ClassPath<AbstractFile>> cp = new LinkedList<>(); - - for (int i = 0; i < entries.size(); i++) { - cp.add(entries.apply(i)); - } - - for (URL url : urls) { - AbstractFile file; - if ("file".equals(url.getProtocol())) { - File f = new File(url.getPath()); - if (f.isDirectory()) { - file = AbstractFile.getDirectory(scala.reflect.io.File.jfile2path(f)); - } else { - file = AbstractFile.getFile(scala.reflect.io.File.jfile2path(f)); - } - } else { - file = AbstractFile.getURL(url); - } - - ClassPath<AbstractFile> newcp = platform.classPath().context().newClassPath(file); - - // distinct - if (cp.contains(newcp) == false) { - cp.add(newcp); - } - } - - return new MergedClassPath(scala.collection.JavaConversions.asScalaBuffer(cp).toIndexedSeq(), - platform.classPath().context()); - } - - public List<String> load(String artifact, - boolean addSparkContext) throws Exception { - return load(artifact, new LinkedList<String>(), addSparkContext); - } - - public List<String> load(String artifact, Collection<String> excludes, - boolean addSparkContext) throws Exception { - if (StringUtils.isBlank(artifact)) { - // Should throw here - throw new RuntimeException("Invalid artifact to load"); - } - - // <groupId>:<artifactId>[:<extension>[:<classifier>]]:<version> - int numSplits = artifact.split(":").length; - if (numSplits >= 3 && numSplits <= 6) { - return loadFromMvn(artifact, excludes, addSparkContext); - } else { - loadFromFs(artifact, addSparkContext); - LinkedList<String> libs = new LinkedList<>(); - libs.add(artifact); - return libs; - } - } - - private void loadFromFs(String artifact, boolean addSparkContext) throws Exception { - File jarFile = new File(artifact); - - global.new Run(); - - if (sc.version().startsWith("1.1")) { - updateRuntimeClassPath_1_x(new URL[] {jarFile.toURI().toURL()}); - } else { - updateRuntimeClassPath_2_x(new URL[] {jarFile.toURI().toURL()}); - } - - if (addSparkContext) { - sc.addJar(jarFile.getAbsolutePath()); - } - } - - private List<String> loadFromMvn(String artifact, Collection<String> excludes, - boolean addSparkContext) throws Exception { - List<String> loadedLibs = new LinkedList<>(); - Collection<String> allExclusions = new LinkedList<>(); - allExclusions.addAll(excludes); - allExclusions.addAll(Arrays.asList(exclusions)); - - List<ArtifactResult> listOfArtifact; - listOfArtifact = getArtifactsWithDep(artifact, allExclusions); - - Iterator<ArtifactResult> it = listOfArtifact.iterator(); - while (it.hasNext()) { - Artifact a = it.next().getArtifact(); - String gav = a.getGroupId() + ":" + a.getArtifactId() + ":" + a.getVersion(); - for (String exclude : allExclusions) { - if (gav.startsWith(exclude)) { - it.remove(); - break; - } - } - } - - List<URL> newClassPathList = new LinkedList<>(); - List<File> files = new LinkedList<>(); - for (ArtifactResult artifactResult : listOfArtifact) { - logger.info("Load " + artifactResult.getArtifact().getGroupId() + ":" - + artifactResult.getArtifact().getArtifactId() + ":" - + artifactResult.getArtifact().getVersion()); - newClassPathList.add(artifactResult.getArtifact().getFile().toURI().toURL()); - files.add(artifactResult.getArtifact().getFile()); - loadedLibs.add(artifactResult.getArtifact().getGroupId() + ":" - + artifactResult.getArtifact().getArtifactId() + ":" - + artifactResult.getArtifact().getVersion()); - } - - global.new Run(); - if (sc.version().startsWith("1.1")) { - updateRuntimeClassPath_1_x(newClassPathList.toArray(new URL[0])); - } else { - updateRuntimeClassPath_2_x(newClassPathList.toArray(new URL[0])); - } - updateCompilerClassPath(newClassPathList.toArray(new URL[0])); - - if (addSparkContext) { - for (File f : files) { - sc.addJar(f.getAbsolutePath()); - } - } - - return loadedLibs; - } - - /** - * @param dependency - * @param excludes list of pattern can either be of the form groupId:artifactId - * @return - * @throws Exception - */ - @Override - public List<ArtifactResult> getArtifactsWithDep(String dependency, - Collection<String> excludes) throws Exception { - Artifact artifact = new DefaultArtifact(inferScalaVersion(dependency)); - DependencyFilter classpathFilter = DependencyFilterUtils.classpathFilter(JavaScopes.COMPILE); - PatternExclusionsDependencyFilter exclusionFilter = - new PatternExclusionsDependencyFilter(inferScalaVersion(excludes)); - - CollectRequest collectRequest = new CollectRequest(); - collectRequest.setRoot(new Dependency(artifact, JavaScopes.COMPILE)); - - synchronized (repos) { - for (RemoteRepository repo : repos) { - collectRequest.addRepository(repo); - } - } - DependencyRequest dependencyRequest = new DependencyRequest(collectRequest, - DependencyFilterUtils.andFilter(exclusionFilter, classpathFilter)); - return system.resolveDependencies(session, dependencyRequest).getArtifactResults(); - } - - public static Collection<String> inferScalaVersion(Collection<String> artifact) { - List<String> list = new LinkedList<>(); - for (String a : artifact) { - list.add(inferScalaVersion(a)); - } - return list; - } - - public static String inferScalaVersion(String artifact) { - int pos = artifact.indexOf(":"); - if (pos < 0 || pos + 2 >= artifact.length()) { - // failed to infer - return artifact; - } - - if (':' == artifact.charAt(pos + 1)) { - String restOfthem = ""; - String versionSep = ":"; - - String groupId = artifact.substring(0, pos); - int nextPos = artifact.indexOf(":", pos + 2); - if (nextPos < 0) { - if (artifact.charAt(artifact.length() - 1) == '*') { - nextPos = artifact.length() - 1; - versionSep = ""; - restOfthem = "*"; - } else { - versionSep = ""; - nextPos = artifact.length(); - } - } - - String artifactId = artifact.substring(pos + 2, nextPos); - if (nextPos < artifact.length()) { - if (!restOfthem.equals("*")) { - restOfthem = artifact.substring(nextPos + 1); - } - } - - String [] version = scala.util.Properties.versionNumberString().split("[.]"); - String scalaVersion = version[0] + "." + version[1]; - - return groupId + ":" + artifactId + "_" + scalaVersion + versionSep + restOfthem; - } else { - return artifact; - } - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/resources/R/zeppelin_sparkr.R ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/R/zeppelin_sparkr.R b/spark/src/main/resources/R/zeppelin_sparkr.R deleted file mode 100644 index 525c6c5..0000000 --- a/spark/src/main/resources/R/zeppelin_sparkr.R +++ /dev/null @@ -1,105 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -args <- commandArgs(trailingOnly = TRUE) - -hashCode <- as.integer(args[1]) -port <- as.integer(args[2]) -libPath <- args[3] -version <- as.integer(args[4]) -rm(args) - -print(paste("Port ", toString(port))) -print(paste("LibPath ", libPath)) - -.libPaths(c(file.path(libPath), .libPaths())) -library(SparkR) - - -SparkR:::connectBackend("localhost", port, 6000) - -# scStartTime is needed by R/pkg/R/sparkR.R -assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv) - -# getZeppelinR -.zeppelinR = SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinR", "getZeppelinR", hashCode) - -# setup spark env -assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv) -assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv) -if (version >= 20000) { - assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv) - assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) - assign(".sparkRjsc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getJavaSparkContext"), envir = SparkR:::.sparkREnv) -} -assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv) -assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) -assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv) - -z.put <- function(name, object) { - SparkR:::callJMethod(.zeppelinContext, "put", name, object) -} -z.get <- function(name) { - SparkR:::callJMethod(.zeppelinContext, "get", name) -} -z.input <- function(name, value) { - SparkR:::callJMethod(.zeppelinContext, "input", name, value) -} - -# notify script is initialized -SparkR:::callJMethod(.zeppelinR, "onScriptInitialized") - -while (TRUE) { - req <- SparkR:::callJMethod(.zeppelinR, "getRequest") - type <- SparkR:::callJMethod(req, "getType") - stmt <- SparkR:::callJMethod(req, "getStmt") - value <- SparkR:::callJMethod(req, "getValue") - - if (type == "eval") { - tryCatch({ - ret <- eval(parse(text=stmt)) - SparkR:::callJMethod(.zeppelinR, "setResponse", "", FALSE) - }, error = function(e) { - SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE) - }) - } else if (type == "set") { - tryCatch({ - ret <- assign(stmt, value) - SparkR:::callJMethod(.zeppelinR, "setResponse", "", FALSE) - }, error = function(e) { - SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE) - }) - } else if (type == "get") { - tryCatch({ - ret <- eval(parse(text=stmt)) - SparkR:::callJMethod(.zeppelinR, "setResponse", ret, FALSE) - }, error = function(e) { - SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE) - }) - } else if (type == "getS") { - tryCatch({ - ret <- eval(parse(text=stmt)) - SparkR:::callJMethod(.zeppelinR, "setResponse", toString(ret), FALSE) - }, error = function(e) { - SparkR:::callJMethod(.zeppelinR, "setResponse", toString(e), TRUE) - }) - } else { - # unsupported type - SparkR:::callJMethod(.zeppelinR, "setResponse", paste("Unsupported type ", type), TRUE) - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/interpreter-setting.json b/spark/src/main/resources/interpreter-setting.json deleted file mode 100644 index f45c85c..0000000 --- a/spark/src/main/resources/interpreter-setting.json +++ /dev/null @@ -1,226 +0,0 @@ -[ - { - "group": "spark", - "name": "spark", - "className": "org.apache.zeppelin.spark.SparkInterpreter", - "defaultInterpreter": true, - "properties": { - "spark.executor.memory": { - "envName": null, - "propertyName": "spark.executor.memory", - "defaultValue": "", - "description": "Executor memory per worker instance. ex) 512m, 32g", - "type": "string" - }, - "args": { - "envName": null, - "propertyName": null, - "defaultValue": "", - "description": "spark commandline args", - "type": "textarea" - }, - "zeppelin.spark.useHiveContext": { - "envName": "ZEPPELIN_SPARK_USEHIVECONTEXT", - "propertyName": "zeppelin.spark.useHiveContext", - "defaultValue": true, - "description": "Use HiveContext instead of SQLContext if it is true.", - "type": "checkbox" - }, - "spark.app.name": { - "envName": "SPARK_APP_NAME", - "propertyName": "spark.app.name", - "defaultValue": "Zeppelin", - "description": "The name of spark application.", - "type": "string" - }, - "zeppelin.spark.printREPLOutput": { - "envName": null, - "propertyName": "zeppelin.spark.printREPLOutput", - "defaultValue": true, - "description": "Print REPL output", - "type": "checkbox" - }, - "spark.cores.max": { - "envName": null, - "propertyName": "spark.cores.max", - "defaultValue": "", - "description": "Total number of cores to use. Empty value uses all available core.", - "type": "number" - }, - "zeppelin.spark.maxResult": { - "envName": "ZEPPELIN_SPARK_MAXRESULT", - "propertyName": "zeppelin.spark.maxResult", - "defaultValue": "1000", - "description": "Max number of Spark SQL result to display.", - "type": "number" - }, - "master": { - "envName": "MASTER", - "propertyName": "spark.master", - "defaultValue": "local[*]", - "description": "Spark master uri. ex) spark://masterhost:7077", - "type": "string" - }, - "zeppelin.spark.enableSupportedVersionCheck": { - "envName": null, - "propertyName": "zeppelin.spark.enableSupportedVersionCheck", - "defaultValue": true, - "description": "Do not change - developer only setting, not for production use", - "type": "checkbox" - }, - "zeppelin.spark.uiWebUrl": { - "envName": null, - "propertyName": "zeppelin.spark.uiWebUrl", - "defaultValue": "", - "description": "Override Spark UI default URL", - "type": "string" - } - }, - "editor": { - "language": "scala", - "editOnDblClick": false, - "completionKey": "TAB" - } - }, - { - "group": "spark", - "name": "sql", - "className": "org.apache.zeppelin.spark.SparkSqlInterpreter", - "properties": { - "zeppelin.spark.concurrentSQL": { - "envName": "ZEPPELIN_SPARK_CONCURRENTSQL", - "propertyName": "zeppelin.spark.concurrentSQL", - "defaultValue": false, - "description": "Execute multiple SQL concurrently if set true.", - "type": "checkbox" - }, - "zeppelin.spark.sql.stacktrace": { - "envName": "ZEPPELIN_SPARK_SQL_STACKTRACE", - "propertyName": "zeppelin.spark.sql.stacktrace", - "defaultValue": false, - "description": "Show full exception stacktrace for SQL queries if set to true.", - "type": "checkbox" - }, - "zeppelin.spark.maxResult": { - "envName": "ZEPPELIN_SPARK_MAXRESULT", - "propertyName": "zeppelin.spark.maxResult", - "defaultValue": "1000", - "description": "Max number of Spark SQL result to display.", - "type": "number" - }, - "zeppelin.spark.importImplicit": { - "envName": "ZEPPELIN_SPARK_IMPORTIMPLICIT", - "propertyName": "zeppelin.spark.importImplicit", - "defaultValue": true, - "description": "Import implicits, UDF collection, and sql if set true. true by default.", - "type": "checkbox" - } - }, - "editor": { - "language": "sql", - "editOnDblClick": false, - "completionKey": "TAB" - } - }, - { - "group": "spark", - "name": "dep", - "className": "org.apache.zeppelin.spark.DepInterpreter", - "properties": { - "zeppelin.dep.localrepo": { - "envName": "ZEPPELIN_DEP_LOCALREPO", - "propertyName": null, - "defaultValue": "local-repo", - "description": "local repository for dependency loader", - "type": "string" - }, - "zeppelin.dep.additionalRemoteRepository": { - "envName": null, - "propertyName": null, - "defaultValue": "spark-packages,http://dl.bintray.com/spark-packages/maven,false;", - "description": "A list of 'id,remote-repository-URL,is-snapshot;' for each remote repository.", - "type": "textarea" - } - }, - "editor": { - "language": "scala", - "editOnDblClick": false, - "completionKey": "TAB" - } - }, - { - "group": "spark", - "name": "pyspark", - "className": "org.apache.zeppelin.spark.PySparkInterpreter", - "properties": { - "zeppelin.pyspark.python": { - "envName": "PYSPARK_PYTHON", - "propertyName": null, - "defaultValue": "python", - "description": "Python command to run pyspark with", - "type": "string" - }, - "zeppelin.pyspark.useIPython": { - "envName": null, - "propertyName": "zeppelin.pyspark.useIPython", - "defaultValue": true, - "description": "whether use IPython when it is available", - "type": "checkbox" - } - }, - "editor": { - "language": "python", - "editOnDblClick": false, - "completionKey": "TAB" - } - }, - { - "group": "spark", - "name": "ipyspark", - "className": "org.apache.zeppelin.spark.IPySparkInterpreter", - "properties": {}, - "editor": { - "language": "python", - "editOnDblClick": false - } - }, - { - "group": "spark", - "name": "r", - "className": "org.apache.zeppelin.spark.SparkRInterpreter", - "properties": { - "zeppelin.R.knitr": { - "envName": "ZEPPELIN_R_KNITR", - "propertyName": "zeppelin.R.knitr", - "defaultValue": true, - "description": "whether use knitr or not", - "type": "checkbox" - }, - "zeppelin.R.cmd": { - "envName": "ZEPPELIN_R_CMD", - "propertyName": "zeppelin.R.cmd", - "defaultValue": "R", - "description": "R repl path", - "type": "string" - }, - "zeppelin.R.image.width": { - "envName": "ZEPPELIN_R_IMAGE_WIDTH", - "propertyName": "zeppelin.R.image.width", - "defaultValue": "100%", - "description": "", - "type": "number" - }, - "zeppelin.R.render.options": { - "envName": "ZEPPELIN_R_RENDER_OPTIONS", - "propertyName": "zeppelin.R.render.options", - "defaultValue": "out.format = 'html', comment = NA, echo = FALSE, results = 'asis', message = F, warning = F, fig.retina = 2", - "description": "", - "type": "textarea" - } - }, - "editor": { - "language": "r", - "editOnDblClick": false - } - } -] http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/resources/python/zeppelin_ipyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_ipyspark.py b/spark/src/main/resources/python/zeppelin_ipyspark.py deleted file mode 100644 index 324f481..0000000 --- a/spark/src/main/resources/python/zeppelin_ipyspark.py +++ /dev/null @@ -1,53 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -from py4j.java_gateway import java_import, JavaGateway, GatewayClient -from pyspark.conf import SparkConf -from pyspark.context import SparkContext - -# for back compatibility -from pyspark.sql import SQLContext - -# start JVM gateway -client = GatewayClient(port=${JVM_GATEWAY_PORT}) -gateway = JavaGateway(client, auto_convert=True) - -java_import(gateway.jvm, "org.apache.spark.SparkEnv") -java_import(gateway.jvm, "org.apache.spark.SparkConf") -java_import(gateway.jvm, "org.apache.spark.api.java.*") -java_import(gateway.jvm, "org.apache.spark.api.python.*") -java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") - -intp = gateway.entry_point -jsc = intp.getJavaSparkContext() - -java_import(gateway.jvm, "org.apache.spark.sql.*") -java_import(gateway.jvm, "org.apache.spark.sql.hive.*") -java_import(gateway.jvm, "scala.Tuple2") - -jconf = jsc.getConf() -conf = SparkConf(_jvm=gateway.jvm, _jconf=jconf) -sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf) - -if intp.isSpark2(): - from pyspark.sql import SparkSession - - spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) - sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped -else: - sqlContext = sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext()) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py deleted file mode 100644 index c10855a..0000000 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ /dev/null @@ -1,393 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os, sys, getopt, traceback, json, re - -from py4j.java_gateway import java_import, JavaGateway, GatewayClient -from py4j.protocol import Py4JJavaError -from pyspark.conf import SparkConf -from pyspark.context import SparkContext -import ast -import warnings - -# for back compatibility -from pyspark.sql import SQLContext, HiveContext, Row - -class Logger(object): - def __init__(self): - pass - - def write(self, message): - intp.appendOutput(message) - - def reset(self): - pass - - def flush(self): - pass - - -class PyZeppelinContext(dict): - def __init__(self, zc): - self.z = zc - self._displayhook = lambda *args: None - - def show(self, obj): - from pyspark.sql import DataFrame - if isinstance(obj, DataFrame): - print(self.z.showData(obj._jdf)) - else: - print(str(obj)) - - # By implementing special methods it makes operating on it more Pythonic - def __setitem__(self, key, item): - self.z.put(key, item) - - def __getitem__(self, key): - return self.z.get(key) - - def __delitem__(self, key): - self.z.remove(key) - - def __contains__(self, item): - return self.z.containsKey(item) - - def add(self, key, value): - self.__setitem__(key, value) - - def put(self, key, value): - self.__setitem__(key, value) - - def get(self, key): - return self.__getitem__(key) - - def getInterpreterContext(self): - return self.z.getInterpreterContext() - - def input(self, name, defaultValue=""): - return self.z.input(name, defaultValue) - - def textbox(self, name, defaultValue=""): - return self.z.textbox(name, defaultValue) - - def noteTextbox(self, name, defaultValue=""): - return self.z.noteTextbox(name, defaultValue) - - def select(self, name, options, defaultValue=""): - # auto_convert to ArrayList doesn't match the method signature on JVM side - return self.z.select(name, defaultValue, self.getParamOptions(options)) - - def noteSelect(self, name, options, defaultValue=""): - return self.z.noteSelect(name, defaultValue, self.getParamOptions(options)) - - def checkbox(self, name, options, defaultChecked=None): - optionsIterable = self.getParamOptions(options) - defaultCheckedIterables = self.getDefaultChecked(defaultChecked) - checkedItems = gateway.jvm.scala.collection.JavaConversions.seqAsJavaList(self.z.checkbox(name, defaultCheckedIterables, optionsIterable)) - result = [] - for checkedItem in checkedItems: - result.append(checkedItem) - return result; - - def noteCheckbox(self, name, options, defaultChecked=None): - optionsIterable = self.getParamOptions(options) - defaultCheckedIterables = self.getDefaultChecked(defaultChecked) - checkedItems = gateway.jvm.scala.collection.JavaConversions.seqAsJavaList(self.z.noteCheckbox(name, defaultCheckedIterables, optionsIterable)) - result = [] - for checkedItem in checkedItems: - result.append(checkedItem) - return result; - - def getParamOptions(self, options): - tuples = list(map(lambda items: self.__tupleToScalaTuple2(items), options)) - return gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(tuples) - - def getDefaultChecked(self, defaultChecked): - if defaultChecked is None: - defaultChecked = [] - return gateway.jvm.scala.collection.JavaConversions.collectionAsScalaIterable(defaultChecked) - - def registerHook(self, event, cmd, replName=None): - if replName is None: - self.z.registerHook(event, cmd) - else: - self.z.registerHook(event, cmd, replName) - - def unregisterHook(self, event, replName=None): - if replName is None: - self.z.unregisterHook(event) - else: - self.z.unregisterHook(event, replName) - - def getHook(self, event, replName=None): - if replName is None: - return self.z.getHook(event) - return self.z.getHook(event, replName) - - def _setup_matplotlib(self): - # If we don't have matplotlib installed don't bother continuing - try: - import matplotlib - except ImportError: - return - - # Make sure custom backends are available in the PYTHONPATH - rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd()) - mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python') - if mpl_path not in sys.path: - sys.path.append(mpl_path) - - # Finally check if backend exists, and if so configure as appropriate - try: - matplotlib.use('module://backend_zinline') - import backend_zinline - - # Everything looks good so make config assuming that we are using - # an inline backend - self._displayhook = backend_zinline.displayhook - self.configure_mpl(width=600, height=400, dpi=72, fontsize=10, - interactive=True, format='png', context=self.z) - except ImportError: - # Fall back to Agg if no custom backend installed - matplotlib.use('Agg') - warnings.warn("Unable to load inline matplotlib backend, " - "falling back to Agg") - - def configure_mpl(self, **kwargs): - import mpl_config - mpl_config.configure(**kwargs) - - def __tupleToScalaTuple2(self, tuple): - if (len(tuple) == 2): - return gateway.jvm.scala.Tuple2(tuple[0], tuple[1]) - else: - raise IndexError("options must be a list of tuple of 2") - - -class SparkVersion(object): - SPARK_1_4_0 = 10400 - SPARK_1_3_0 = 10300 - SPARK_2_0_0 = 20000 - - def __init__(self, versionNumber): - self.version = versionNumber - - def isAutoConvertEnabled(self): - return self.version >= self.SPARK_1_4_0 - - def isImportAllPackageUnderSparkSql(self): - return self.version >= self.SPARK_1_3_0 - - def isSpark2(self): - return self.version >= self.SPARK_2_0_0 - -class PySparkCompletion: - def __init__(self, interpreterObject): - self.interpreterObject = interpreterObject - - def getGlobalCompletion(self): - objectDefList = [] - try: - for completionItem in list(globals().keys()): - objectDefList.append(completionItem) - except: - return None - else: - return objectDefList - - def getMethodCompletion(self, text_value): - execResult = locals() - if text_value == None: - return None - completion_target = text_value - try: - if len(completion_target) <= 0: - return None - if text_value[-1] == ".": - completion_target = text_value[:-1] - exec("{} = dir({})".format("objectDefList", completion_target), globals(), execResult) - except: - return None - else: - return list(execResult['objectDefList']) - - - def getCompletion(self, text_value): - completionList = set() - - globalCompletionList = self.getGlobalCompletion() - if globalCompletionList != None: - for completionItem in list(globalCompletionList): - completionList.add(completionItem) - - if text_value != None: - objectCompletionList = self.getMethodCompletion(text_value) - if objectCompletionList != None: - for completionItem in list(objectCompletionList): - completionList.add(completionItem) - if len(completionList) <= 0: - self.interpreterObject.setStatementsFinished("", False) - else: - result = json.dumps(list(filter(lambda x : not re.match("^__.*", x), list(completionList)))) - self.interpreterObject.setStatementsFinished(result, False) - -client = GatewayClient(port=int(sys.argv[1])) -sparkVersion = SparkVersion(int(sys.argv[2])) -if sparkVersion.isSpark2(): - from pyspark.sql import SparkSession -else: - from pyspark.sql import SchemaRDD - -if sparkVersion.isAutoConvertEnabled(): - gateway = JavaGateway(client, auto_convert = True) -else: - gateway = JavaGateway(client) - -java_import(gateway.jvm, "org.apache.spark.SparkEnv") -java_import(gateway.jvm, "org.apache.spark.SparkConf") -java_import(gateway.jvm, "org.apache.spark.api.java.*") -java_import(gateway.jvm, "org.apache.spark.api.python.*") -java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") - -intp = gateway.entry_point -output = Logger() -sys.stdout = output -sys.stderr = output -intp.onPythonScriptInitialized(os.getpid()) - -jsc = intp.getJavaSparkContext() - -if sparkVersion.isImportAllPackageUnderSparkSql(): - java_import(gateway.jvm, "org.apache.spark.sql.*") - java_import(gateway.jvm, "org.apache.spark.sql.hive.*") -else: - java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") - java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") - java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext") - java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") - - -java_import(gateway.jvm, "scala.Tuple2") - -_zcUserQueryNameSpace = {} - -jconf = intp.getSparkConf() -conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) -sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf) -_zcUserQueryNameSpace["_zsc_"] = _zsc_ -_zcUserQueryNameSpace["sc"] = sc - -if sparkVersion.isSpark2(): - spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) - sqlc = __zSqlc__ = __zSpark__._wrapped - _zcUserQueryNameSpace["sqlc"] = sqlc - _zcUserQueryNameSpace["__zSqlc__"] = __zSqlc__ - _zcUserQueryNameSpace["spark"] = spark - _zcUserQueryNameSpace["__zSpark__"] = __zSpark__ -else: - sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext()) - _zcUserQueryNameSpace["sqlc"] = sqlc - _zcUserQueryNameSpace["__zSqlc__"] = sqlc - -sqlContext = __zSqlc__ -_zcUserQueryNameSpace["sqlContext"] = sqlContext - -completion = __zeppelin_completion__ = PySparkCompletion(intp) -_zcUserQueryNameSpace["completion"] = completion -_zcUserQueryNameSpace["__zeppelin_completion__"] = __zeppelin_completion__ - -z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext()) -__zeppelin__._setup_matplotlib() -_zcUserQueryNameSpace["z"] = z -_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__ - -while True : - req = intp.getStatements() - try: - stmts = req.statements().split("\n") - jobGroup = req.jobGroup() - jobDesc = req.jobDescription() - - # Get post-execute hooks - try: - global_hook = intp.getHook('post_exec_dev') - except: - global_hook = None - - try: - user_hook = __zeppelin__.getHook('post_exec') - except: - user_hook = None - - nhooks = 0 - for hook in (global_hook, user_hook): - if hook: - nhooks += 1 - - if stmts: - # use exec mode to compile the statements except the last statement, - # so that the last statement's evaluation will be printed to stdout - sc.setJobGroup(jobGroup, jobDesc) - code = compile('\n'.join(stmts), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1) - to_run_hooks = [] - if (nhooks > 0): - to_run_hooks = code.body[-nhooks:] - to_run_exec, to_run_single = (code.body[:-(nhooks + 1)], - [code.body[-(nhooks + 1)]]) - - try: - for node in to_run_exec: - mod = ast.Module([node]) - code = compile(mod, '<stdin>', 'exec') - exec(code, _zcUserQueryNameSpace) - - for node in to_run_single: - mod = ast.Interactive([node]) - code = compile(mod, '<stdin>', 'single') - exec(code, _zcUserQueryNameSpace) - - for node in to_run_hooks: - mod = ast.Module([node]) - code = compile(mod, '<stdin>', 'exec') - exec(code, _zcUserQueryNameSpace) - - intp.setStatementsFinished("", False) - except Py4JJavaError: - # raise it to outside try except - raise - except: - exception = traceback.format_exc() - m = re.search("File \"<stdin>\", line (\d+).*", exception) - if m: - line_no = int(m.group(1)) - intp.setStatementsFinished( - "Fail to execute line {}: {}\n".format(line_no, stmts[line_no - 1]) + exception, True) - else: - intp.setStatementsFinished(exception, True) - else: - intp.setStatementsFinished("", False) - - except Py4JJavaError: - excInnerError = traceback.format_exc() # format_tb() does not return the inner exception - innerErrorStart = excInnerError.find("Py4JJavaError:") - if innerErrorStart > -1: - excInnerError = excInnerError[innerErrorStart:] - intp.setStatementsFinished(excInnerError + str(sys.exc_info()), True) - except: - intp.setStatementsFinished(traceback.format_exc(), True) - - output.reset() http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/scala/org/apache/spark/SparkRBackend.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/spark/SparkRBackend.scala b/spark/src/main/scala/org/apache/spark/SparkRBackend.scala deleted file mode 100644 index 05f1ac0..0000000 --- a/spark/src/main/scala/org/apache/spark/SparkRBackend.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark - -import org.apache.spark.api.r.RBackend - -object SparkRBackend { - val backend : RBackend = new RBackend() - private var started = false; - private var portNumber = 0; - - val backendThread : Thread = new Thread("SparkRBackend") { - override def run() { - backend.run() - } - } - - def init() : Int = { - portNumber = backend.init() - portNumber - } - - def start() : Unit = { - backendThread.start() - started = true - } - - def close() : Unit = { - backend.close() - backendThread.join() - } - - def isStarted() : Boolean = { - started - } - - def port(): Int = { - return portNumber - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala b/spark/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala deleted file mode 100644 index a9014c2..0000000 --- a/spark/src/main/scala/org/apache/zeppelin/spark/ZeppelinRDisplay.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark - -import org.apache.zeppelin.interpreter.InterpreterResult.Code -import org.apache.zeppelin.interpreter.InterpreterResult.Code.{SUCCESS} -import org.apache.zeppelin.interpreter.InterpreterResult.Type -import org.apache.zeppelin.interpreter.InterpreterResult.Type.{TEXT, HTML, TABLE, IMG} -import org.jsoup.Jsoup -import org.jsoup.nodes.Element -import org.jsoup.nodes.Document.OutputSettings -import org.jsoup.safety.Whitelist - -import scala.collection.JavaConversions._ -import scala.util.matching.Regex - -case class RDisplay(content: String, `type`: Type, code: Code) - -object ZeppelinRDisplay { - - val pattern = new Regex("""^ *\[\d*\] """) - - def render(html: String, imageWidth: String): RDisplay = { - - val document = Jsoup.parse(html) - document.outputSettings().prettyPrint(false) - - val body = document.body() - - if (body.getElementsByTag("p").isEmpty) return RDisplay(body.html(), HTML, SUCCESS) - - val bodyHtml = body.html() - - if (! bodyHtml.contains("<img") - && ! bodyHtml.contains("<script") - && ! bodyHtml.contains("%html ") - && ! bodyHtml.contains("%table ") - && ! bodyHtml.contains("%img ") - ) { - return textDisplay(body) - } - - if (bodyHtml.contains("%table")) { - return tableDisplay(body) - } - - if (bodyHtml.contains("%img")) { - return imgDisplay(body) - } - - return htmlDisplay(body, imageWidth) - } - - private def textDisplay(body: Element): RDisplay = { - // remove HTML tag while preserving whitespaces and newlines - val text = Jsoup.clean(body.html(), "", - Whitelist.none(), new OutputSettings().prettyPrint(false)) - RDisplay(text, TEXT, SUCCESS) - } - - private def tableDisplay(body: Element): RDisplay = { - val p = body.getElementsByTag("p").first().html.replace("â%table " , "").replace("â", "") - val r = (pattern findFirstIn p).getOrElse("") - val table = p.replace(r, "").replace("\\t", "\t").replace("\\n", "\n") - RDisplay(table, TABLE, SUCCESS) - } - - private def imgDisplay(body: Element): RDisplay = { - val p = body.getElementsByTag("p").first().html.replace("â%img " , "").replace("â", "") - val r = (pattern findFirstIn p).getOrElse("") - val img = p.replace(r, "") - RDisplay(img, IMG, SUCCESS) - } - - private def htmlDisplay(body: Element, imageWidth: String): RDisplay = { - var div = new String() - - for (element <- body.children) { - - val eHtml = element.html() - var eOuterHtml = element.outerHtml() - - eOuterHtml = eOuterHtml.replace("â%html " , "").replace("â", "") - - val r = (pattern findFirstIn eHtml).getOrElse("") - - div = div + eOuterHtml.replace(r, "") - } - - val content = div - .replaceAll("src=\"//", "src=\"http://") - .replaceAll("href=\"//", "href=\"http://") - - body.html(content) - - for (image <- body.getElementsByTag("img")) { - image.attr("width", imageWidth) - } - - RDisplay(body.html, HTML, SUCCESS) - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala b/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala deleted file mode 100644 index 8181434..0000000 --- a/spark/src/main/scala/org/apache/zeppelin/spark/utils/DisplayUtils.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark.utils - -import java.lang.StringBuilder - -import org.apache.spark.rdd.RDD - -import scala.collection.IterableLike - -object DisplayUtils { - - implicit def toDisplayRDDFunctions[T <: Product](rdd: RDD[T]): DisplayRDDFunctions[T] = new DisplayRDDFunctions[T](rdd) - - implicit def toDisplayTraversableFunctions[T <: Product](traversable: Traversable[T]): DisplayTraversableFunctions[T] = new DisplayTraversableFunctions[T](traversable) - - def html(htmlContent: String = "") = s"%html $htmlContent" - - def img64(base64Content: String = "") = s"%img $base64Content" - - def img(url: String) = s"<img src='$url' />" -} - -trait DisplayCollection[T <: Product] { - - def printFormattedData(traversable: Traversable[T], columnLabels: String*): Unit = { - val providedLabelCount: Int = columnLabels.size - var maxColumnCount:Int = 1 - val headers = new StringBuilder("%table ") - - val data = new StringBuilder("") - - traversable.foreach(tuple => { - maxColumnCount = math.max(maxColumnCount,tuple.productArity) - data.append(tuple.productIterator.mkString("\t")).append("\n") - }) - - if (providedLabelCount > maxColumnCount) { - headers.append(columnLabels.take(maxColumnCount).mkString("\t")).append("\n") - } else if (providedLabelCount < maxColumnCount) { - val missingColumnHeaders = ((providedLabelCount+1) to maxColumnCount).foldLeft[String](""){ - (stringAccumulator,index) => if (index==1) s"Column$index" else s"$stringAccumulator\tColumn$index" - } - - headers.append(columnLabels.mkString("\t")).append(missingColumnHeaders).append("\n") - } else { - headers.append(columnLabels.mkString("\t")).append("\n") - } - - headers.append(data) - - print(headers.toString) - } - -} - -class DisplayRDDFunctions[T <: Product] (val rdd: RDD[T]) extends DisplayCollection[T] { - - def display(columnLabels: String*)(implicit sparkMaxResult: SparkMaxResult): Unit = { - printFormattedData(rdd.take(sparkMaxResult.maxResult), columnLabels: _*) - } - - def display(sparkMaxResult:Int, columnLabels: String*): Unit = { - printFormattedData(rdd.take(sparkMaxResult), columnLabels: _*) - } -} - -class DisplayTraversableFunctions[T <: Product] (val traversable: Traversable[T]) extends DisplayCollection[T] { - - def display(columnLabels: String*): Unit = { - printFormattedData(traversable, columnLabels: _*) - } -} - -class SparkMaxResult(val maxResult: Int) extends Serializable http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca87f7d4/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java deleted file mode 100644 index e177d49..0000000 --- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.spark; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Properties; - -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.apache.zeppelin.display.GUI; -import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -public class DepInterpreterTest { - - @Rule - public TemporaryFolder tmpDir = new TemporaryFolder(); - - private DepInterpreter dep; - private InterpreterContext context; - - private Properties getTestProperties() throws IOException { - Properties p = new Properties(); - p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath()); - p.setProperty("zeppelin.dep.additionalRemoteRepository", "spark-packages,http://dl.bintray.com/spark-packages/maven,false;"); - return p; - } - - @Before - public void setUp() throws Exception { - Properties p = getTestProperties(); - - dep = new DepInterpreter(p); - dep.open(); - - InterpreterGroup intpGroup = new InterpreterGroup(); - intpGroup.put("note", new LinkedList<Interpreter>()); - intpGroup.get("note").add(new SparkInterpreter(p)); - intpGroup.get("note").add(dep); - dep.setInterpreterGroup(intpGroup); - - context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(), - new HashMap<String, Object>(), new GUI(), new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - null, - new LinkedList<InterpreterContextRunner>(), null); - } - - @After - public void tearDown() throws Exception { - dep.close(); - } - - @Test - public void testDefault() { - dep.getDependencyContext().reset(); - InterpreterResult ret = dep.interpret("z.load(\"org.apache.commons:commons-csv:1.1\")", context); - assertEquals(Code.SUCCESS, ret.code()); - - assertEquals(1, dep.getDependencyContext().getFiles().size()); - assertEquals(1, dep.getDependencyContext().getFilesDist().size()); - - // Add a test for the spark-packages repo - default in additionalRemoteRepository - ret = dep.interpret("z.load(\"amplab:spark-indexedrdd:0.3\")", context); - assertEquals(Code.SUCCESS, ret.code()); - - // Reset at the end of the test - dep.getDependencyContext().reset(); - } -}