Repository: zeppelin Updated Branches: refs/heads/branch-0.7 8956d682f -> 835e9e21f
ZEPPELIN-1933. Set pig job name and allow to set pig property in pig interpreter setting ### What is this PR for? Two improvements for pig interpreter. * Set job name via paragraph title if it exists, otherwise use the last line of pig script * Allow to set any pig property in interpreter setting ### What type of PR is it? [ Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-1933 ### How should this be tested? Unit tested and manually tested. ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #1885 from zjffdu/ZEPPELIN-1933 and squashes the following commits: d2e1cd4 [Jeff Zhang] address comments 9cee380 [Jeff Zhang] ZEPPELIN-1933. Set pig job name and allow to set pig property in pig interpreter setting (cherry picked from commit 41f9fd921d77f5112107ba76c8794213cf3af929) Signed-off-by: Felix Cheung <felixche...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/835e9e21 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/835e9e21 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/835e9e21 Branch: refs/heads/branch-0.7 Commit: 835e9e21f7a94066313af98c44de9b793b1ba54d Parents: 8956d68 Author: Jeff Zhang <zjf...@apache.org> Authored: Thu Jan 12 15:39:17 2017 +0800 Committer: Felix Cheung <felixche...@apache.org> Committed: Sun Jan 15 11:09:34 2017 -0800 ---------------------------------------------------------------------- docs/interpreter/pig.md | 12 ++++++++++ .../apache/zeppelin/pig/BasePigInterpreter.java | 24 ++++++++++++++++++++ .../org/apache/zeppelin/pig/PigInterpreter.java | 8 +++++++ .../zeppelin/pig/PigQueryInterpreter.java | 1 + .../zeppelin/pig/PigInterpreterTezTest.java | 5 ++++ 5 files changed, 50 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/835e9e21/docs/interpreter/pig.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/pig.md b/docs/interpreter/pig.md index a778169..ad2e80a 100644 --- a/docs/interpreter/pig.md +++ b/docs/interpreter/pig.md @@ -52,6 +52,8 @@ group: manual ### How to configure interpreter At the Interpreters menu, you have to create a new Pig interpreter. Pig interpreter has below properties by default. +And you can set any pig properties here which will be passed to pig engine. (like tez.queue.name & mapred.job.queue.name). +Besides, we use paragraph title as job name if it exists, else use the last line of pig script. So you can use that to find app running in YARN RM UI. <table class="table-configuration"> <tr> @@ -74,6 +76,16 @@ At the Interpreters menu, you have to create a new Pig interpreter. Pig interpre <td>1000</td> <td>max row number displayed in <code>%pig.query</code></td> </tr> + <tr> + <td>tez.queue.name</td> + <td>default</td> + <td>queue name for tez engine</td> + </tr> + <tr> + <td>mapred.job.queue.name</td> + <td>default</td> + <td>queue name for mapreduce engine</td> + </tr> </table> ### Example http://git-wip-us.apache.org/repos/asf/zeppelin/blob/835e9e21/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java ---------------------------------------------------------------------- diff --git a/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java index 0aa8a20..a9bb2ce 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/BasePigInterpreter.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.pig; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.pig.PigServer; import org.apache.pig.backend.BackendException; @@ -97,4 +98,27 @@ public abstract class BasePigInterpreter extends Interpreter { } public abstract PigServer getPigServer(); + + /** + * Use paragraph title if it exists, else use the last line of pig script. + * @param cmd + * @param context + * @return + */ + protected String createJobName(String cmd, InterpreterContext context) { + String pTitle = context.getParagraphTitle(); + if (!StringUtils.isBlank(pTitle)) { + return pTitle; + } else { + // use the last non-empty line of pig script as the job name. + String[] lines = cmd.split("\n"); + for (int i = lines.length - 1; i >= 0; --i) { + if (!StringUtils.isBlank(lines[i])) { + return lines[i]; + } + } + // in case all the lines are empty, but usually it is almost impossible + return "empty_job"; + } + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/835e9e21/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java ---------------------------------------------------------------------- diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java index 8cd1efc..0b50c67 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigInterpreter.java @@ -18,6 +18,7 @@ package org.apache.zeppelin.pig; import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.pig.PigServer; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -58,6 +59,12 @@ public class PigInterpreter extends BasePigInterpreter { } try { pigServer = new PigServer(execType); + for (Map.Entry entry : getProperty().entrySet()) { + if (!entry.getKey().toString().startsWith("zeppelin.")) { + pigServer.getPigContext().getProperties().setProperty(entry.getKey().toString(), + entry.getValue().toString()); + } + } } catch (IOException e) { LOGGER.error("Fail to initialize PigServer", e); throw new RuntimeException("Fail to initialize PigServer", e); @@ -78,6 +85,7 @@ public class PigInterpreter extends BasePigInterpreter { ByteArrayOutputStream bytesOutput = new ByteArrayOutputStream(); File tmpFile = null; try { + pigServer.setJobName(createJobName(cmd, contextInterpreter)); tmpFile = PigUtils.createTempPigScript(cmd); System.setOut(new PrintStream(bytesOutput)); // each thread should its own ScriptState & PigStats http://git-wip-us.apache.org/repos/asf/zeppelin/blob/835e9e21/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java ---------------------------------------------------------------------- diff --git a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java index 77632a7..5e968d6 100644 --- a/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java +++ b/pig/src/main/java/org/apache/zeppelin/pig/PigQueryInterpreter.java @@ -78,6 +78,7 @@ public class PigQueryInterpreter extends BasePigInterpreter { StringBuilder resultBuilder = new StringBuilder("%table "); try { + pigServer.setJobName(createJobName(st, context)); File tmpScriptFile = PigUtils.createTempPigScript(queries); // each thread should its own ScriptState & PigStats ScriptState.start(pigServer.getPigContext().getExecutionEngine().instantiateScriptState()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/835e9e21/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTezTest.java ---------------------------------------------------------------------- diff --git a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTezTest.java b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTezTest.java index e742fd8..964b31c 100644 --- a/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTezTest.java +++ b/pig/src/test/java/org/apache/zeppelin/pig/PigInterpreterTezTest.java @@ -45,6 +45,7 @@ public class PigInterpreterTezTest { Properties properties = new Properties(); properties.put("zeppelin.pig.execType", "tez_local"); properties.put("zeppelin.pig.includeJobStats", includeJobStats + ""); + properties.put("tez.queue.name", "test"); pigInterpreter = new PigInterpreter(properties); pigInterpreter.open(); context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, null, @@ -60,6 +61,10 @@ public class PigInterpreterTezTest { public void testBasics() throws IOException { setUpTez(false); + assertEquals("test", + pigInterpreter.getPigServer().getPigContext().getProperties() + .getProperty("tez.queue.name")); + String content = "1\tandy\n" + "2\tpeter\n"; File tmpFile = File.createTempFile("zeppelin", "test");