Repository: zeppelin Updated Branches: refs/heads/master 9e8d7eb90 -> 1135fb61d
[ZEPPELIN-1965] Livy SQL Interpreter: Should use df.show(1000, false)⦠⦠to display results ### What is this PR for? Livy SQL interpreter truncate result strings of size greater than 20. In some cases, we like to see the full string. We are adding a interpreter property **zeppelin.livy.spark.sql.field.truncate** to control whether to truncate strings or not. By default, **zeppelin.livy.spark.sql.field.truncate** is set to **true**. ### What type of PR is it? Improvement ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-1965 ### How should this be tested? Set zeppelin.livy.spark.sql.field.truncate to true or false Run a SQL query which produces string values of length greater than 20. Depending on the value of zeppelin.livy.spark.sql.field.truncate, the strings will either get truncated or not. ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Benoy Antony <be...@apache.org> Closes #2201 from benoyantony/master and squashes the following commits: bb006c0 [Benoy Antony] changed field name and description 9eae68b [Benoy Antony] added a null check to avoid testcase failures, another nullcheck for backward compatibility and added two new testcases ab1ead2 [Benoy Antony] documented zeppelin.livy.spark.sql.truncate b6252be [Benoy Antony] [ZEPPELIN-1965] Livy SQL Interpreter: Should use df.show(1000, false) to display results Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1135fb61 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1135fb61 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1135fb61 Branch: refs/heads/master Commit: 1135fb61d24e4a7baa82d1442e683eecb5de33d2 Parents: 9e8d7eb Author: Benoy Antony <be...@apache.org> Authored: Fri Mar 31 01:04:08 2017 -0700 Committer: Felix Cheung <felixche...@apache.org> Committed: Sun Apr 2 11:53:49 2017 -0700 ---------------------------------------------------------------------- docs/interpreter/livy.md | 7 +- .../zeppelin/livy/BaseLivyInterprereter.java | 4 +- .../zeppelin/livy/LivySparkSQLInterpreter.java | 19 ++- .../src/main/resources/interpreter-setting.json | 5 + .../apache/zeppelin/livy/LivyInterpreterIT.java | 127 ++++++++++++++++++- 5 files changed, 155 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/docs/interpreter/livy.md ---------------------------------------------------------------------- diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index ce1c34a..a7b776c 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -56,11 +56,16 @@ Example: `spark.driver.memory` to `livy.spark.driver.memory` <td>URL where livy server is running</td> </tr> <tr> - <td>zeppelin.livy.spark.maxResult</td> + <td>zeppelin.livy.spark.sql.maxResult</td> <td>1000</td> <td>Max number of Spark SQL result to display.</td> </tr> <tr> + <td>zeppelin.livy.spark.sql.field.truncate</td> + <td>true</td> + <td>Whether to truncate field values longer than 20 characters or not</td> + </tr> + <tr> <td>zeppelin.livy.session.create_timeout</td> <td>120</td> <td>Timeout in seconds for session creation</td> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java index d29d20c..26342a2 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterprereter.java @@ -272,7 +272,9 @@ public abstract class BaseLivyInterprereter extends Interpreter { throw new LivyException(e); } stmtInfo = getStatementInfo(stmtInfo.id); - paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100)); + if (paragraphId != null) { + paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100)); + } } if (appendSessionExpired) { return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo), http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java ---------------------------------------------------------------------- diff --git a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java index 48e4967..2eaf79c 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/LivySparkSQLInterpreter.java @@ -32,14 +32,25 @@ import java.util.Properties; */ public class LivySparkSQLInterpreter extends BaseLivyInterprereter { + public static final String ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE = + "zeppelin.livy.spark.sql.field.truncate"; + + public static final String ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT = + "zeppelin.livy.spark.sql.maxResult"; + private LivySparkInterpreter sparkInterpreter; private boolean isSpark2 = false; private int maxResult = 1000; + private boolean truncate = true; public LivySparkSQLInterpreter(Properties property) { super(property); - this.maxResult = Integer.parseInt(property.getProperty("zeppelin.livy.spark.sql.maxResult")); + this.maxResult = Integer.parseInt(property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT)); + if (property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE) != null) { + this.truncate = + Boolean.parseBoolean(property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE)); + } } @Override @@ -111,9 +122,11 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter { // use triple quote so that we don't need to do string escape. String sqlQuery = null; if (isSpark2) { - sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")"; + sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " + + truncate + ")"; } else { - sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")"; + sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " + + truncate + ")"; } InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context.getParagraphId(), this.displayAppInfo, true); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/livy/src/main/resources/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/livy/src/main/resources/interpreter-setting.json b/livy/src/main/resources/interpreter-setting.json index 42f64cf..8d3dea0 100644 --- a/livy/src/main/resources/interpreter-setting.json +++ b/livy/src/main/resources/interpreter-setting.json @@ -118,6 +118,11 @@ "defaultValue": "1000", "description": "Max number of Spark SQL result to display." }, + "zeppelin.livy.spark.sql.field.truncate": { + "propertyName": "zeppelin.livy.spark.sql.field.truncate", + "defaultValue": "true", + "description": "If true, truncate field values longer than 20 characters." + }, "zeppelin.livy.concurrentSQL": { "propertyName": "zeppelin.livy.concurrentSQL", "defaultValue": "false", http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1135fb61/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java ---------------------------------------------------------------------- diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java index 1a8e8df..3da908c 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java @@ -20,7 +20,6 @@ package org.apache.zeppelin.livy; import com.cloudera.livy.test.framework.Cluster; import com.cloudera.livy.test.framework.Cluster$; -import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.user.AuthenticationInfo; @@ -33,7 +32,6 @@ import java.util.ArrayList; import java.util.Properties; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class LivyInterpreterIT { @@ -309,6 +307,131 @@ public class LivyInterpreterIT { } @Test + public void testStringWithTruncation() { + if (!checkPreCondition()) { + return; + } + InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); + interpreterGroup.put("session_1", new ArrayList<Interpreter>()); + LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties); + sparkInterpreter.setInterpreterGroup(interpreterGroup); + interpreterGroup.get("session_1").add(sparkInterpreter); + AuthenticationInfo authInfo = new AuthenticationInfo("user1"); + MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); + InterpreterOutput output = new InterpreterOutput(outputListener); + InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", + "title", "text", authInfo, null, null, null, null, null, output); + sparkInterpreter.open(); + + LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties); + interpreterGroup.get("session_1").add(sqlInterpreter); + sqlInterpreter.setInterpreterGroup(interpreterGroup); + sqlInterpreter.open(); + + try { + // detect spark version + InterpreterResult result = sparkInterpreter.interpret("sc.version", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + + boolean isSpark2 = isSpark2(sparkInterpreter, context); + + // test DataFrame api + if (!isSpark2) { + result = sparkInterpreter.interpret( + "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); + } else { + result = sparkInterpreter.interpret( + "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); + } + sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); + // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter + result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); + assertEquals("col_1\tcol_2\n12characters12cha...\t20", result.message().get(0).getData()); + } finally { + sparkInterpreter.close(); + sqlInterpreter.close(); + } + } + + @Test + public void testStringWithoutTruncation() { + if (!checkPreCondition()) { + return; + } + InterpreterGroup interpreterGroup = new InterpreterGroup("group_1"); + interpreterGroup.put("session_1", new ArrayList<Interpreter>()); + Properties newProps = new Properties(); + for (Object name: properties.keySet()) { + newProps.put(name, properties.get(name)); + } + newProps.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false"); + LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(newProps); + sparkInterpreter.setInterpreterGroup(interpreterGroup); + interpreterGroup.get("session_1").add(sparkInterpreter); + AuthenticationInfo authInfo = new AuthenticationInfo("user1"); + MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener(); + InterpreterOutput output = new InterpreterOutput(outputListener); + InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark", + "title", "text", authInfo, null, null, null, null, null, output); + sparkInterpreter.open(); + + LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(newProps); + interpreterGroup.get("session_1").add(sqlInterpreter); + sqlInterpreter.setInterpreterGroup(interpreterGroup); + sqlInterpreter.open(); + + try { + // detect spark version + InterpreterResult result = sparkInterpreter.interpret("sc.version", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + + boolean isSpark2 = isSpark2(sparkInterpreter, context); + + // test DataFrame api + if (!isSpark2) { + result = sparkInterpreter.interpret( + "val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); + } else { + result = sparkInterpreter.interpret( + "val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n" + + "df.collect()", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(1, result.message().size()); + assertTrue(result.message().get(0).getData() + .contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])")); + } + sparkInterpreter.interpret("df.registerTempTable(\"df\")", context); + // test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter + result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType()); + assertEquals("col_1\tcol_2\n12characters12characters\t20", result.message().get(0).getData()); + } finally { + sparkInterpreter.close(); + sqlInterpreter.close(); + } + } + + @Test public void testPySparkInterpreter() { if (!checkPreCondition()) { return;