This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 88234f5 [ZEPPELIN-4780]. Use angular instead of html for flink streaming output 88234f5 is described below commit 88234f5f96249bc0a398a8bd548951f8ab1a0b5b Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Apr 29 15:11:36 2020 +0800 [ZEPPELIN-4780]. Use angular instead of html for flink streaming output ### What is this PR for? This PR is to resolve the paragraph shaking issue further. This PR use angular instead of html, so that each time we only update the angular object instead of the whole result. This can totally resolve the paragraph shaking issue. ### What type of PR is it? [ Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4780 ### How should this be tested? * CI ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3754 from zjffdu/ZEPPELIN-4780 and squashes the following commits: a901df5fc [Jeff Zhang] [ZEPPELIN-4780]. Use angular instead of html for flink streaming output (cherry picked from commit cb47d73fe378fd86afccb05f2112c56fd43c2ccd) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../java/org/apache/zeppelin/flink/JobManager.java | 23 ++++---- .../zeppelin/flink/sql/SingleRowStreamSqlJob.java | 27 ++++++---- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 63 ++++++++++++---------- 3 files changed, 66 insertions(+), 47 deletions(-) diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java index 3fdb825..3ba36d1 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -160,6 +160,7 @@ public class JobManager { private boolean isStreamingInsertInto; private int progress; private AtomicBoolean running = new AtomicBoolean(true); + private boolean isFirstPoll = true; FlinkJobProgressPoller(String flinkWebUI, JobID jobId, InterpreterContext context) { this.flinkWebUI = flinkWebUI; @@ -196,15 +197,19 @@ public class JobManager { running.wait(1000); } if (isStreamingInsertInto) { - StringBuilder builder = new StringBuilder("%html "); - builder.append("<h1>Duration: " + - rootNode.getObject().getLong("duration") / 1000 + - " seconds"); - builder.append("\n%text "); - context.out.clear(false); - sendFlinkJobUrl(context); - context.out.write(builder.toString()); - context.out.flush(); + if (isFirstPoll) { + StringBuilder builder = new StringBuilder("%angular "); + builder.append("<h1>Duration: {{duration}} seconds"); + builder.append("\n%text "); + context.out.clear(false); + context.out.write(builder.toString()); + context.out.flush(); + isFirstPoll = false; + } + context.getAngularObjectRegistry().add("duration", + rootNode.getObject().getLong("duration") / 1000, + context.getNoteId(), + context.getParagraphId()); } } catch (Exception e) { LOGGER.error("Fail to poll flink job progress via rest api", e); diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java index 20bad29..3c3125f 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java @@ -20,9 +20,7 @@ package org.apache.zeppelin.flink.sql; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.scala.StreamTableEnvironment; import org.apache.flink.types.Row; -import org.apache.flink.util.StringUtils; import org.apache.zeppelin.flink.JobManager; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.tabledata.TableDataUtils; @@ -36,6 +34,7 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob { private Row latestRow; private String template; + private boolean isFirstRefresh = true; public SingleRowStreamSqlJob(StreamExecutionEnvironment senv, TableEnvironment stenv, @@ -64,11 +63,10 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob { @Override protected String buildResult() { StringBuilder builder = new StringBuilder(); - builder.append("%html\n"); + builder.append("%angular "); String outputText = template; for (int i = 0; i < latestRow.getArity(); ++i) { - outputText = outputText.replace("{" + i + "}", - TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(latestRow.getField(i)))); + outputText = outputText.replace("{" + i + "}", "{{value_" + i + "}}"); } builder.append(outputText); return builder.toString(); @@ -80,10 +78,19 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob { LOGGER.warn("Skip RefreshTask as no data available"); return; } - context.out().clear(false); - String output = buildResult(); - context.out.write(output); - LOGGER.debug("Refresh Output: " + output); - context.out.flush(); + if (isFirstRefresh) { + context.out().clear(false); + String output = buildResult(); + context.out.write(output); + context.out.flush(); + isFirstRefresh = false; + } + + for (int i = 0; i < latestRow.getArity(); ++i) { + context.getAngularObjectRegistry().add("value_" + i, + TableDataUtils.normalizeColumn(latestRow.getField(i)), + context.getNoteId(), + context.getParagraphId()); + } } } diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 2cea70c..dda5413 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -161,6 +161,7 @@ public class JDBCInterpreter extends KerberosInterpreter { private SqlSplitter sqlSplitter; private Map<String, ScheduledExecutorService> refreshExecutorServices = new HashMap<>(); + private Map<String, Boolean> isFirstRefreshMap = new HashMap<>(); private Map<String, Boolean> paragraphCancelMap = new HashMap<>(); public JDBCInterpreter(Properties property) { @@ -577,32 +578,10 @@ public class JDBCInterpreter extends KerberosInterpreter { return null; } - private String getResults(ResultSet resultSet, - boolean isTableType, - String template) + private String getResults(ResultSet resultSet, boolean isTableType) throws SQLException { ResultSetMetaData md = resultSet.getMetaData(); - - /** - * If html template is provided, only fetch the first row. - */ - if (template != null) { - resultSet.next(); - String result = "%html " + template + "\n"; - for (int i = 1; i <= md.getColumnCount(); ++i) { - Object columnObject = resultSet.getObject(i); - String columnValue = null; - if (columnObject == null) { - columnValue = "null"; - } else { - columnValue = resultSet.getString(i); - } - result = result.replace("{" + (i - 1) + "}", columnValue); - } - return result; - } - StringBuilder msg; if (isTableType) { msg = new StringBuilder(TABLE_MAGIC_TAG); @@ -759,11 +738,38 @@ public class JDBCInterpreter extends KerberosInterpreter { resultSet.getMetaData().getColumnCount())) { context.out.write("%text Query executed successfully.\n"); } else { - String results = getResults(resultSet, - !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE), - context.getLocalProperties().get("template")); - context.out.write(results); - context.out.write("\n%text "); + String template = context.getLocalProperties().get("template"); + if (!StringUtils.isBlank(template)) { + resultSet.next(); + ResultSetMetaData md = resultSet.getMetaData(); + if (isFirstRefreshMap.get(context.getParagraphId())) { + String angularTemplate = template; + for (int j = 0; j < md.getColumnCount(); ++j) { + angularTemplate = angularTemplate.replace("{" + j + "}", "{{value_" + j + "}}"); + } + context.out.write("%angular " + angularTemplate); + context.out.write("\n%text "); + context.out.flush(); + isFirstRefreshMap.put(context.getParagraphId(), false); + } + for (int j = 1; j <= md.getColumnCount(); ++j) { + Object columnObject = resultSet.getObject(j); + String columnValue = null; + if (columnObject == null) { + columnValue = "null"; + } else { + columnValue = resultSet.getString(j); + } + context.getAngularObjectRegistry().add("value_" + (j - 1), + columnValue, context.getNoteId(), context.getParagraphId()); + } + } else { + String results = getResults(resultSet, + !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE)); + context.out.write(results); + context.out.write("\n%text "); + context.out.flush(); + } } } else { // Response contains either an update count or there are no results. @@ -851,6 +857,7 @@ public class JDBCInterpreter extends KerberosInterpreter { paragraphCancelMap.put(context.getParagraphId(), false); ScheduledExecutorService refreshExecutor = Executors.newSingleThreadScheduledExecutor(); refreshExecutorServices.put(context.getParagraphId(), refreshExecutor); + isFirstRefreshMap.put(context.getParagraphId(), true); final AtomicReference<InterpreterResult> interpreterResultRef = new AtomicReference(); refreshExecutor.scheduleAtFixedRate(() -> { context.out.clear(false);