This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new cb47d73  [ZEPPELIN-4780]. Use angular instead of html for flink 
streaming output
cb47d73 is described below

commit cb47d73fe378fd86afccb05f2112c56fd43c2ccd
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Wed Apr 29 15:11:36 2020 +0800

    [ZEPPELIN-4780]. Use angular instead of html for flink streaming output
    
    ### What is this PR for?
    
    This PR is to resolve the paragraph shaking issue further. This PR use 
angular instead of html, so that each time we only update the angular object 
instead of the whole result. This can totally resolve the paragraph shaking 
issue.
    
    ### What type of PR is it?
    [ Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4780
    
    ### How should this be tested?
    * CI
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3754 from zjffdu/ZEPPELIN-4780 and squashes the following commits:
    
    a901df5fc [Jeff Zhang] [ZEPPELIN-4780]. Use angular instead of html for 
flink streaming output
---
 .../java/org/apache/zeppelin/flink/JobManager.java | 23 ++++----
 .../zeppelin/flink/sql/SingleRowStreamSqlJob.java  | 27 ++++++----
 .../org/apache/zeppelin/jdbc/JDBCInterpreter.java  | 63 ++++++++++++----------
 3 files changed, 66 insertions(+), 47 deletions(-)

diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java 
b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 235676d..471ae75 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -160,6 +160,7 @@ public class JobManager {
     private boolean isStreamingInsertInto;
     private int progress;
     private AtomicBoolean running = new AtomicBoolean(true);
+    private boolean isFirstPoll = true;
 
     FlinkJobProgressPoller(String flinkWebUI, JobID jobId, InterpreterContext 
context) {
       this.flinkWebUI = flinkWebUI;
@@ -197,15 +198,19 @@ public class JobManager {
             running.wait(1000);
           }
           if (isStreamingInsertInto) {
-            StringBuilder builder = new StringBuilder("%html ");
-            builder.append("<h1>Duration: " +
-                    rootNode.getObject().getLong("duration") / 1000 +
-                    " seconds");
-            builder.append("\n%text ");
-            context.out.clear(false);
-            sendFlinkJobUrl(context);
-            context.out.write(builder.toString());
-            context.out.flush();
+            if (isFirstPoll) {
+              StringBuilder builder = new StringBuilder("%angular ");
+              builder.append("<h1>Duration: {{duration}} seconds");
+              builder.append("\n%text ");
+              context.out.clear(false);
+              context.out.write(builder.toString());
+              context.out.flush();
+              isFirstPoll = false;
+            }
+            context.getAngularObjectRegistry().add("duration",
+                    rootNode.getObject().getLong("duration") / 1000,
+                    context.getNoteId(),
+                    context.getParagraphId());
           }
         } catch (Exception e) {
           LOGGER.error("Fail to poll flink job progress via rest api, rest 
api: " + rootNode, e);
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java 
b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
index 20bad29..3c3125f 100644
--- 
a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
+++ 
b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
@@ -20,9 +20,7 @@ package org.apache.zeppelin.flink.sql;
 
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.scala.StreamTableEnvironment;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.StringUtils;
 import org.apache.zeppelin.flink.JobManager;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.tabledata.TableDataUtils;
@@ -36,6 +34,7 @@ public class SingleRowStreamSqlJob extends 
AbstractStreamSqlJob {
 
   private Row latestRow;
   private String template;
+  private boolean isFirstRefresh = true;
 
   public SingleRowStreamSqlJob(StreamExecutionEnvironment senv,
                                TableEnvironment stenv,
@@ -64,11 +63,10 @@ public class SingleRowStreamSqlJob extends 
AbstractStreamSqlJob {
   @Override
   protected String buildResult() {
     StringBuilder builder = new StringBuilder();
-    builder.append("%html\n");
+    builder.append("%angular ");
     String outputText = template;
     for (int i = 0; i < latestRow.getArity(); ++i) {
-      outputText = outputText.replace("{" + i + "}",
-              
TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(latestRow.getField(i))));
+      outputText = outputText.replace("{" + i + "}", "{{value_" + i + "}}");
     }
     builder.append(outputText);
     return builder.toString();
@@ -80,10 +78,19 @@ public class SingleRowStreamSqlJob extends 
AbstractStreamSqlJob {
       LOGGER.warn("Skip RefreshTask as no data available");
       return;
     }
-    context.out().clear(false);
-    String output = buildResult();
-    context.out.write(output);
-    LOGGER.debug("Refresh Output: " + output);
-    context.out.flush();
+    if (isFirstRefresh) {
+      context.out().clear(false);
+      String output = buildResult();
+      context.out.write(output);
+      context.out.flush();
+      isFirstRefresh = false;
+    }
+
+    for (int i = 0; i < latestRow.getArity(); ++i) {
+      context.getAngularObjectRegistry().add("value_" + i,
+              TableDataUtils.normalizeColumn(latestRow.getField(i)),
+              context.getNoteId(),
+              context.getParagraphId());
+    }
   }
 }
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java 
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index 2cea70c..dda5413 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -161,6 +161,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
   private SqlSplitter sqlSplitter;
 
   private Map<String, ScheduledExecutorService> refreshExecutorServices = new 
HashMap<>();
+  private Map<String, Boolean> isFirstRefreshMap = new HashMap<>();
   private Map<String, Boolean> paragraphCancelMap = new HashMap<>();
 
   public JDBCInterpreter(Properties property) {
@@ -577,32 +578,10 @@ public class JDBCInterpreter extends KerberosInterpreter {
     return null;
   }
 
-  private String getResults(ResultSet resultSet,
-                            boolean isTableType,
-                            String template)
+  private String getResults(ResultSet resultSet, boolean isTableType)
       throws SQLException {
 
     ResultSetMetaData md = resultSet.getMetaData();
-
-    /**
-     * If html template is provided, only fetch the first row.
-     */
-    if (template != null) {
-      resultSet.next();
-      String result = "%html " + template + "\n";
-      for (int i = 1; i <= md.getColumnCount(); ++i) {
-        Object columnObject = resultSet.getObject(i);
-        String columnValue = null;
-        if (columnObject == null) {
-          columnValue = "null";
-        } else {
-          columnValue = resultSet.getString(i);
-        }
-        result = result.replace("{" + (i - 1) + "}", columnValue);
-      }
-      return result;
-    }
-
     StringBuilder msg;
     if (isTableType) {
       msg = new StringBuilder(TABLE_MAGIC_TAG);
@@ -759,11 +738,38 @@ public class JDBCInterpreter extends KerberosInterpreter {
                 resultSet.getMetaData().getColumnCount())) {
               context.out.write("%text Query executed successfully.\n");
             } else {
-              String results = getResults(resultSet,
-                  !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE),
-                      context.getLocalProperties().get("template"));
-              context.out.write(results);
-              context.out.write("\n%text ");
+              String template = context.getLocalProperties().get("template");
+              if (!StringUtils.isBlank(template)) {
+                resultSet.next();
+                ResultSetMetaData md = resultSet.getMetaData();
+                if (isFirstRefreshMap.get(context.getParagraphId())) {
+                  String angularTemplate = template;
+                  for (int j = 0; j < md.getColumnCount(); ++j) {
+                    angularTemplate = angularTemplate.replace("{" + j + "}", 
"{{value_" + j + "}}");
+                  }
+                  context.out.write("%angular " + angularTemplate);
+                  context.out.write("\n%text ");
+                  context.out.flush();
+                  isFirstRefreshMap.put(context.getParagraphId(), false);
+                }
+                for (int j = 1; j <= md.getColumnCount(); ++j) {
+                  Object columnObject = resultSet.getObject(j);
+                  String columnValue = null;
+                  if (columnObject == null) {
+                    columnValue = "null";
+                  } else {
+                    columnValue = resultSet.getString(j);
+                  }
+                  context.getAngularObjectRegistry().add("value_" + (j - 1),
+                          columnValue, context.getNoteId(), 
context.getParagraphId());
+                }
+              } else {
+                String results = getResults(resultSet,
+                        !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE));
+                context.out.write(results);
+                context.out.write("\n%text ");
+                context.out.flush();
+              }
             }
           } else {
             // Response contains either an update count or there are no 
results.
@@ -851,6 +857,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
       paragraphCancelMap.put(context.getParagraphId(), false);
       ScheduledExecutorService refreshExecutor = 
Executors.newSingleThreadScheduledExecutor();
       refreshExecutorServices.put(context.getParagraphId(), refreshExecutor);
+      isFirstRefreshMap.put(context.getParagraphId(), true);
       final AtomicReference<InterpreterResult> interpreterResultRef = new 
AtomicReference();
       refreshExecutor.scheduleAtFixedRate(() -> {
         context.out.clear(false);

Reply via email to