This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new cb47d73 [ZEPPELIN-4780]. Use angular instead of html for flink streaming output cb47d73 is described below commit cb47d73fe378fd86afccb05f2112c56fd43c2ccd Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Apr 29 15:11:36 2020 +0800 [ZEPPELIN-4780]. Use angular instead of html for flink streaming output ### What is this PR for? This PR is to resolve the paragraph shaking issue further. This PR use angular instead of html, so that each time we only update the angular object instead of the whole result. This can totally resolve the paragraph shaking issue. ### What type of PR is it? [ Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4780 ### How should this be tested? * CI ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3754 from zjffdu/ZEPPELIN-4780 and squashes the following commits: a901df5fc [Jeff Zhang] [ZEPPELIN-4780]. Use angular instead of html for flink streaming output --- .../java/org/apache/zeppelin/flink/JobManager.java | 23 ++++---- .../zeppelin/flink/sql/SingleRowStreamSqlJob.java | 27 ++++++---- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 63 ++++++++++++---------- 3 files changed, 66 insertions(+), 47 deletions(-) diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java index 235676d..471ae75 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -160,6 +160,7 @@ public class JobManager { private boolean isStreamingInsertInto; private int progress; private AtomicBoolean running = new AtomicBoolean(true); + private boolean isFirstPoll = true; FlinkJobProgressPoller(String flinkWebUI, JobID jobId, InterpreterContext context) { this.flinkWebUI = flinkWebUI; @@ -197,15 +198,19 @@ public class JobManager { running.wait(1000); } if (isStreamingInsertInto) { - StringBuilder builder = new StringBuilder("%html "); - builder.append("<h1>Duration: " + - rootNode.getObject().getLong("duration") / 1000 + - " seconds"); - builder.append("\n%text "); - context.out.clear(false); - sendFlinkJobUrl(context); - context.out.write(builder.toString()); - context.out.flush(); + if (isFirstPoll) { + StringBuilder builder = new StringBuilder("%angular "); + builder.append("<h1>Duration: {{duration}} seconds"); + builder.append("\n%text "); + context.out.clear(false); + context.out.write(builder.toString()); + context.out.flush(); + isFirstPoll = false; + } + context.getAngularObjectRegistry().add("duration", + rootNode.getObject().getLong("duration") / 1000, + context.getNoteId(), + context.getParagraphId()); } } catch (Exception e) { LOGGER.error("Fail to poll flink job progress via rest api, rest api: " + rootNode, e); diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java index 20bad29..3c3125f 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java @@ -20,9 +20,7 @@ package org.apache.zeppelin.flink.sql; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.scala.StreamTableEnvironment; import org.apache.flink.types.Row; -import org.apache.flink.util.StringUtils; import org.apache.zeppelin.flink.JobManager; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.tabledata.TableDataUtils; @@ -36,6 +34,7 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob { private Row latestRow; private String template; + private boolean isFirstRefresh = true; public SingleRowStreamSqlJob(StreamExecutionEnvironment senv, TableEnvironment stenv, @@ -64,11 +63,10 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob { @Override protected String buildResult() { StringBuilder builder = new StringBuilder(); - builder.append("%html\n"); + builder.append("%angular "); String outputText = template; for (int i = 0; i < latestRow.getArity(); ++i) { - outputText = outputText.replace("{" + i + "}", - TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(latestRow.getField(i)))); + outputText = outputText.replace("{" + i + "}", "{{value_" + i + "}}"); } builder.append(outputText); return builder.toString(); @@ -80,10 +78,19 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob { LOGGER.warn("Skip RefreshTask as no data available"); return; } - context.out().clear(false); - String output = buildResult(); - context.out.write(output); - LOGGER.debug("Refresh Output: " + output); - context.out.flush(); + if (isFirstRefresh) { + context.out().clear(false); + String output = buildResult(); + context.out.write(output); + context.out.flush(); + isFirstRefresh = false; + } + + for (int i = 0; i < latestRow.getArity(); ++i) { + context.getAngularObjectRegistry().add("value_" + i, + TableDataUtils.normalizeColumn(latestRow.getField(i)), + context.getNoteId(), + context.getParagraphId()); + } } } diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 2cea70c..dda5413 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -161,6 +161,7 @@ public class JDBCInterpreter extends KerberosInterpreter { private SqlSplitter sqlSplitter; private Map<String, ScheduledExecutorService> refreshExecutorServices = new HashMap<>(); + private Map<String, Boolean> isFirstRefreshMap = new HashMap<>(); private Map<String, Boolean> paragraphCancelMap = new HashMap<>(); public JDBCInterpreter(Properties property) { @@ -577,32 +578,10 @@ public class JDBCInterpreter extends KerberosInterpreter { return null; } - private String getResults(ResultSet resultSet, - boolean isTableType, - String template) + private String getResults(ResultSet resultSet, boolean isTableType) throws SQLException { ResultSetMetaData md = resultSet.getMetaData(); - - /** - * If html template is provided, only fetch the first row. - */ - if (template != null) { - resultSet.next(); - String result = "%html " + template + "\n"; - for (int i = 1; i <= md.getColumnCount(); ++i) { - Object columnObject = resultSet.getObject(i); - String columnValue = null; - if (columnObject == null) { - columnValue = "null"; - } else { - columnValue = resultSet.getString(i); - } - result = result.replace("{" + (i - 1) + "}", columnValue); - } - return result; - } - StringBuilder msg; if (isTableType) { msg = new StringBuilder(TABLE_MAGIC_TAG); @@ -759,11 +738,38 @@ public class JDBCInterpreter extends KerberosInterpreter { resultSet.getMetaData().getColumnCount())) { context.out.write("%text Query executed successfully.\n"); } else { - String results = getResults(resultSet, - !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE), - context.getLocalProperties().get("template")); - context.out.write(results); - context.out.write("\n%text "); + String template = context.getLocalProperties().get("template"); + if (!StringUtils.isBlank(template)) { + resultSet.next(); + ResultSetMetaData md = resultSet.getMetaData(); + if (isFirstRefreshMap.get(context.getParagraphId())) { + String angularTemplate = template; + for (int j = 0; j < md.getColumnCount(); ++j) { + angularTemplate = angularTemplate.replace("{" + j + "}", "{{value_" + j + "}}"); + } + context.out.write("%angular " + angularTemplate); + context.out.write("\n%text "); + context.out.flush(); + isFirstRefreshMap.put(context.getParagraphId(), false); + } + for (int j = 1; j <= md.getColumnCount(); ++j) { + Object columnObject = resultSet.getObject(j); + String columnValue = null; + if (columnObject == null) { + columnValue = "null"; + } else { + columnValue = resultSet.getString(j); + } + context.getAngularObjectRegistry().add("value_" + (j - 1), + columnValue, context.getNoteId(), context.getParagraphId()); + } + } else { + String results = getResults(resultSet, + !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE)); + context.out.write(results); + context.out.write("\n%text "); + context.out.flush(); + } } } else { // Response contains either an update count or there are no results. @@ -851,6 +857,7 @@ public class JDBCInterpreter extends KerberosInterpreter { paragraphCancelMap.put(context.getParagraphId(), false); ScheduledExecutorService refreshExecutor = Executors.newSingleThreadScheduledExecutor(); refreshExecutorServices.put(context.getParagraphId(), refreshExecutor); + isFirstRefreshMap.put(context.getParagraphId(), true); final AtomicReference<InterpreterResult> interpreterResultRef = new AtomicReference(); refreshExecutor.scheduleAtFixedRate(() -> { context.out.clear(false);