This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 496477e [ZEPPELIN-4611]. Fetching rows with newline character (\n) breaks entire table 496477e is described below commit 496477e0ce1e35d4065171520b1546a96eaafa91 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Thu Feb 13 15:33:34 2020 +0800 [ZEPPELIN-4611]. Fetching rows with newline character (\n) breaks entire table ### What is this PR for? This PR would replace all the special characters(\t, \n, \r\n) in table content with white space. so that it won't break the table format display. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4611 ### How should this be tested? * Unit test is added ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3643 from zjffdu/ZEPPELIN-4611 and squashes the following commits: d12eea943 [Jeff Zhang] [ZEPPELIN-4611]. Fetching rows with newline character (\n) breaks entire table --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 9 ++-- .../src/main/resources/python/zeppelin_context.py | 15 +++--- .../zeppelin/python/BasePythonInterpreterTest.java | 5 +- .../zeppelin/spark/SparkSqlInterpreterTest.java | 11 ++-- .../org/apache/zeppelin/spark/Spark1Shims.java | 5 +- .../org/apache/zeppelin/spark/Spark2Shims.java | 5 +- .../org/apache/zeppelin/spark/Spark3Shims.java | 5 +- .../apache/zeppelin/tabledata/TableDataUtils.java | 59 ++++++++++++++++++++++ .../zeppelin/tabledata/TableDataUtilsTest.java | 45 +++++++++++++++++ 9 files changed, 138 insertions(+), 21 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 045a291..b3ff47e 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -34,6 +34,7 @@ import org.apache.hadoop.security.alias.CredentialProvider; import org.apache.hadoop.security.alias.CredentialProviderFactory; import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.apache.zeppelin.interpreter.util.SqlSplitter; +import org.apache.zeppelin.tabledata.TableDataUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -566,9 +567,11 @@ public class JDBCInterpreter extends KerberosInterpreter { msg.append(TAB); } if (StringUtils.isNotEmpty(md.getColumnLabel(i))) { - msg.append(removeTablePrefix(replaceReservedChars(md.getColumnLabel(i)))); + msg.append(removeTablePrefix(replaceReservedChars( + TableDataUtils.normalizeColumn(md.getColumnLabel(i))))); } else { - msg.append(removeTablePrefix(replaceReservedChars(md.getColumnName(i)))); + msg.append(removeTablePrefix(replaceReservedChars( + TableDataUtils.normalizeColumn(md.getColumnName(i))))); } } msg.append(NEWLINE); @@ -588,7 +591,7 @@ public class JDBCInterpreter extends KerberosInterpreter { } else { resultValue = resultSet.getString(i); } - msg.append(replaceReservedChars(resultValue)); + msg.append(replaceReservedChars(TableDataUtils.normalizeColumn(resultValue))); if (i != md.getColumnCount()) { msg.append(TAB); } diff --git a/python/src/main/resources/python/zeppelin_context.py b/python/src/main/resources/python/zeppelin_context.py index b0cdadc..6d4263e 100644 --- a/python/src/main/resources/python/zeppelin_context.py +++ b/python/src/main/resources/python/zeppelin_context.py @@ -185,7 +185,10 @@ class PyZeppelinContext(object): self.show_dataframe(p, **kwargs) else: print(str(p)) - + + def normalizeColumn(self, column): + return column.replace("\t", " ").replace("\r\n", " ").replace("\n", " ") + def show_dataframe(self, df, show_index=False, **kwargs): """Pretty prints DF using Table Display System """ @@ -193,11 +196,11 @@ class PyZeppelinContext(object): header_buf = StringIO("") if show_index: idx_name = str(df.index.name) if df.index.name is not None else "" - header_buf.write(idx_name + "\t") - header_buf.write(str(df.columns[0])) + header_buf.write(self.normalizeColumn(idx_name) + "\t") + header_buf.write(self.normalizeColumn(str(df.columns[0]))) for col in df.columns[1:]: header_buf.write("\t") - header_buf.write(str(col)) + header_buf.write(self.normalizeColumn(str(col))) header_buf.write("\n") body_buf = StringIO("") @@ -208,10 +211,10 @@ class PyZeppelinContext(object): if show_index: body_buf.write("%html <strong>{}</strong>".format(idx)) body_buf.write("\t") - body_buf.write(str(row[0])) + body_buf.write(self.normalizeColumn(str(row[0]))) for cell in row[1:]: body_buf.write("\t") - body_buf.write(str(cell)) + body_buf.write(self.normalizeColumn(str(cell))) # don't print '\n' after the last row if idx != (rowNumber - 1): body_buf.write("\n") diff --git a/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java index 51a3f31..27336a8 100644 --- a/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/BasePythonInterpreterTest.java @@ -301,13 +301,14 @@ public abstract class BasePythonInterpreterTest extends ConcurrentTestCase { // Pandas DataFrame context = getInterpreterContext(); result = interpreter.interpret("import pandas as pd\n" + - "df = pd.DataFrame({'id':[1,2,3], 'name':['a','b','c']})\nz.show(df)", context); + "df = pd.DataFrame({'id':[1,2,3], 'name':['a\ta','b\\nb','c\\r\\nc']})\nz.show(df)", + context); assertEquals(context.out.toInterpreterResultMessage().toString(), InterpreterResult.Code.SUCCESS, result.code()); interpreterResultMessages = context.out.toInterpreterResultMessage(); assertEquals(1, interpreterResultMessages.size()); assertEquals(InterpreterResult.Type.TABLE, interpreterResultMessages.get(0).getType()); - assertEquals("id\tname\n1\ta\n2\tb\n3\tc\n", interpreterResultMessages.get(0).getData()); + assertEquals("id\tname\n1\ta a\n2\tb b\n3\tc c\n", interpreterResultMessages.get(0).getData()); context = getInterpreterContext(); result = interpreter.interpret("import pandas as pd\n" + diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index 1ccfbc5..dcab8d3 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -89,14 +89,17 @@ public class SparkSqlInterpreterTest { @Test public void test() throws InterpreterException { - sparkInterpreter.interpret("case class Test(name:String, age:Int)", context); - sparkInterpreter.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context); - sparkInterpreter.interpret("test.toDF.registerTempTable(\"test\")", context); + InterpreterResult result = sparkInterpreter.interpret("case class Test(name:String, age:Int)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + result = sparkInterpreter.interpret("val test = sc.parallelize(Seq(Test(\"moon\\t1\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\\n1\", 34)))", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + result = sparkInterpreter.interpret("test.toDF.registerTempTable(\"test\")", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); InterpreterResult ret = sqlInterpreter.interpret("select name, age from test where age < 40", context); assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); assertEquals(Type.TABLE, ret.message().get(0).getType()); - assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message().get(0).getData()); + assertEquals("name\tage\nmoon 1\t33\npark 1\t34\n", ret.message().get(0).getData()); ret = sqlInterpreter.interpret("select wrong syntax", context); assertEquals(InterpreterResult.Code.ERROR, ret.code()); diff --git a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java index 6119647..ad0efe4 100644 --- a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java +++ b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java @@ -29,6 +29,7 @@ import org.apache.spark.sql.types.StructType; import org.apache.spark.ui.jobs.JobProgressListener; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.ResultMessages; +import org.apache.zeppelin.tabledata.TableDataUtils; import java.util.ArrayList; import java.util.List; @@ -71,7 +72,7 @@ public class Spark1Shims extends SparkShims { List<Row> rows = df.takeAsList(maxResult + 1); StringBuilder msg = new StringBuilder(); msg.append("\n%table "); - msg.append(StringUtils.join(columns, "\t")); + msg.append(StringUtils.join(TableDataUtils.normalizeColumns(columns), "\t")); msg.append("\n"); boolean isLargerThanMaxResult = rows.size() > maxResult; if (isLargerThanMaxResult) { @@ -79,7 +80,7 @@ public class Spark1Shims extends SparkShims { } for (Row row : rows) { for (int i = 0; i < row.size(); ++i) { - msg.append(row.get(i)); + msg.append(TableDataUtils.normalizeColumn(row.get(i))); if (i != row.size() - 1) { msg.append("\t"); } diff --git a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java index b7b1cf9..3adba1c 100644 --- a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java +++ b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow; import org.apache.spark.sql.types.StructType; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.ResultMessages; +import org.apache.zeppelin.tabledata.TableDataUtils; import java.util.ArrayList; import java.util.List; @@ -72,7 +73,7 @@ public class Spark2Shims extends SparkShims { List<Row> rows = df.takeAsList(maxResult + 1); StringBuilder msg = new StringBuilder(); msg.append("\n%table "); - msg.append(StringUtils.join(columns, "\t")); + msg.append(StringUtils.join(TableDataUtils.normalizeColumns(columns), "\t")); msg.append("\n"); boolean isLargerThanMaxResult = rows.size() > maxResult; if (isLargerThanMaxResult) { @@ -80,7 +81,7 @@ public class Spark2Shims extends SparkShims { } for (Row row : rows) { for (int i = 0; i < row.size(); ++i) { - msg.append(row.get(i)); + msg.append(TableDataUtils.normalizeColumn(row.get(i))); if (i != row.size() -1) { msg.append("\t"); } diff --git a/spark/spark3-shims/src/main/scala/org/apache/zeppelin/spark/Spark3Shims.java b/spark/spark3-shims/src/main/scala/org/apache/zeppelin/spark/Spark3Shims.java index 911c9eb..b213041 100644 --- a/spark/spark3-shims/src/main/scala/org/apache/zeppelin/spark/Spark3Shims.java +++ b/spark/spark3-shims/src/main/scala/org/apache/zeppelin/spark/Spark3Shims.java @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow; import org.apache.spark.sql.types.StructType; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.ResultMessages; +import org.apache.zeppelin.tabledata.TableDataUtils; import java.util.ArrayList; import java.util.List; @@ -72,7 +73,7 @@ public class Spark3Shims extends SparkShims { List<Row> rows = df.takeAsList(maxResult + 1); StringBuilder msg = new StringBuilder(); msg.append("%table "); - msg.append(StringUtils.join(columns, "\t")); + msg.append(StringUtils.join(TableDataUtils.normalizeColumns(columns), "\t")); msg.append("\n"); boolean isLargerThanMaxResult = rows.size() > maxResult; if (isLargerThanMaxResult) { @@ -80,7 +81,7 @@ public class Spark3Shims extends SparkShims { } for (Row row : rows) { for (int i = 0; i < row.size(); ++i) { - msg.append(row.get(i)); + msg.append(TableDataUtils.normalizeColumn(row.get(i))); if (i != row.size() -1) { msg.append("\t"); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataUtils.java new file mode 100644 index 0000000..d587d19 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/tabledata/TableDataUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.tabledata; + + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class TableDataUtils { + + /** + * Replace '\t','\r\n','\n' which represent field delimiter and row delimiter with while space. + * @param column + * @column + */ + public static String normalizeColumn(String column) { + if (column == null) { + return "null"; + } + return column.replace("\t", " ").replace("\r\n", " ").replace("\n", " "); + } + + /** + * Convert obj to String first, convert it to empty string it is null. + * @param obj + * @column + */ + public static String normalizeColumn(Object obj) { + return normalizeColumn(obj == null ? "null" : obj.toString()); + } + + public static List<String> normalizeColumns(List<Object> columns) { + return columns.stream() + .map(TableDataUtils::normalizeColumn) + .collect(Collectors.toList()); + } + + public static List<String> normalizeColumns(Object[] columns) { + return Arrays.stream(columns) + .map(TableDataUtils::normalizeColumn) + .collect(Collectors.toList()); + } +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/tabledata/TableDataUtilsTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/tabledata/TableDataUtilsTest.java new file mode 100644 index 0000000..bc6d9a0 --- /dev/null +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/tabledata/TableDataUtilsTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.tabledata; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TableDataUtilsTest { + + @Test + public void testColumn() { + assertEquals("hello world", TableDataUtils.normalizeColumn("hello\tworld")); + assertEquals("hello world", TableDataUtils.normalizeColumn("hello\nworld")); + assertEquals("hello world", TableDataUtils.normalizeColumn("hello\r\nworld")); + assertEquals("hello world", TableDataUtils.normalizeColumn("hello\t\nworld")); + + assertEquals("null", TableDataUtils.normalizeColumn(null)); + } + + @Test + public void testColumns() { + assertEquals(Lists.newArrayList("hello world", "hello world"), + TableDataUtils.normalizeColumns(new Object[]{"hello\tworld", "hello\nworld"})); + + assertEquals(Lists.newArrayList("hello world", "null"), + TableDataUtils.normalizeColumns(new String[]{"hello\tworld", null})); + } +}