This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 88234f5  [ZEPPELIN-4780]. Use angular instead of html for flink 
streaming output
88234f5 is described below

commit 88234f5f96249bc0a398a8bd548951f8ab1a0b5b
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Wed Apr 29 15:11:36 2020 +0800

    [ZEPPELIN-4780]. Use angular instead of html for flink streaming output
    
    ### What is this PR for?
    
    This PR is to resolve the paragraph shaking issue further. This PR use 
angular instead of html, so that each time we only update the angular object 
instead of the whole result. This can totally resolve the paragraph shaking 
issue.
    
    ### What type of PR is it?
    [ Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4780
    
    ### How should this be tested?
    * CI
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3754 from zjffdu/ZEPPELIN-4780 and squashes the following commits:
    
    a901df5fc [Jeff Zhang] [ZEPPELIN-4780]. Use angular instead of html for 
flink streaming output
    
    (cherry picked from commit cb47d73fe378fd86afccb05f2112c56fd43c2ccd)
    Signed-off-by: Jeff Zhang <zjf...@apache.org>
---
 .../java/org/apache/zeppelin/flink/JobManager.java | 23 ++++----
 .../zeppelin/flink/sql/SingleRowStreamSqlJob.java  | 27 ++++++----
 .../org/apache/zeppelin/jdbc/JDBCInterpreter.java  | 63 ++++++++++++----------
 3 files changed, 66 insertions(+), 47 deletions(-)

diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java 
b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 3fdb825..3ba36d1 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -160,6 +160,7 @@ public class JobManager {
     private boolean isStreamingInsertInto;
     private int progress;
     private AtomicBoolean running = new AtomicBoolean(true);
+    private boolean isFirstPoll = true;
 
     FlinkJobProgressPoller(String flinkWebUI, JobID jobId, InterpreterContext 
context) {
       this.flinkWebUI = flinkWebUI;
@@ -196,15 +197,19 @@ public class JobManager {
             running.wait(1000);
           }
           if (isStreamingInsertInto) {
-            StringBuilder builder = new StringBuilder("%html ");
-            builder.append("<h1>Duration: " +
-                    rootNode.getObject().getLong("duration") / 1000 +
-                    " seconds");
-            builder.append("\n%text ");
-            context.out.clear(false);
-            sendFlinkJobUrl(context);
-            context.out.write(builder.toString());
-            context.out.flush();
+            if (isFirstPoll) {
+              StringBuilder builder = new StringBuilder("%angular ");
+              builder.append("<h1>Duration: {{duration}} seconds");
+              builder.append("\n%text ");
+              context.out.clear(false);
+              context.out.write(builder.toString());
+              context.out.flush();
+              isFirstPoll = false;
+            }
+            context.getAngularObjectRegistry().add("duration",
+                    rootNode.getObject().getLong("duration") / 1000,
+                    context.getNoteId(),
+                    context.getParagraphId());
           }
         } catch (Exception e) {
           LOGGER.error("Fail to poll flink job progress via rest api", e);
diff --git 
a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java 
b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
index 20bad29..3c3125f 100644
--- 
a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
+++ 
b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java
@@ -20,9 +20,7 @@ package org.apache.zeppelin.flink.sql;
 
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.scala.StreamTableEnvironment;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.StringUtils;
 import org.apache.zeppelin.flink.JobManager;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.tabledata.TableDataUtils;
@@ -36,6 +34,7 @@ public class SingleRowStreamSqlJob extends 
AbstractStreamSqlJob {
 
   private Row latestRow;
   private String template;
+  private boolean isFirstRefresh = true;
 
   public SingleRowStreamSqlJob(StreamExecutionEnvironment senv,
                                TableEnvironment stenv,
@@ -64,11 +63,10 @@ public class SingleRowStreamSqlJob extends 
AbstractStreamSqlJob {
   @Override
   protected String buildResult() {
     StringBuilder builder = new StringBuilder();
-    builder.append("%html\n");
+    builder.append("%angular ");
     String outputText = template;
     for (int i = 0; i < latestRow.getArity(); ++i) {
-      outputText = outputText.replace("{" + i + "}",
-              
TableDataUtils.normalizeColumn(StringUtils.arrayAwareToString(latestRow.getField(i))));
+      outputText = outputText.replace("{" + i + "}", "{{value_" + i + "}}");
     }
     builder.append(outputText);
     return builder.toString();
@@ -80,10 +78,19 @@ public class SingleRowStreamSqlJob extends 
AbstractStreamSqlJob {
       LOGGER.warn("Skip RefreshTask as no data available");
       return;
     }
-    context.out().clear(false);
-    String output = buildResult();
-    context.out.write(output);
-    LOGGER.debug("Refresh Output: " + output);
-    context.out.flush();
+    if (isFirstRefresh) {
+      context.out().clear(false);
+      String output = buildResult();
+      context.out.write(output);
+      context.out.flush();
+      isFirstRefresh = false;
+    }
+
+    for (int i = 0; i < latestRow.getArity(); ++i) {
+      context.getAngularObjectRegistry().add("value_" + i,
+              TableDataUtils.normalizeColumn(latestRow.getField(i)),
+              context.getNoteId(),
+              context.getParagraphId());
+    }
   }
 }
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java 
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index 2cea70c..dda5413 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -161,6 +161,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
   private SqlSplitter sqlSplitter;
 
   private Map<String, ScheduledExecutorService> refreshExecutorServices = new 
HashMap<>();
+  private Map<String, Boolean> isFirstRefreshMap = new HashMap<>();
   private Map<String, Boolean> paragraphCancelMap = new HashMap<>();
 
   public JDBCInterpreter(Properties property) {
@@ -577,32 +578,10 @@ public class JDBCInterpreter extends KerberosInterpreter {
     return null;
   }
 
-  private String getResults(ResultSet resultSet,
-                            boolean isTableType,
-                            String template)
+  private String getResults(ResultSet resultSet, boolean isTableType)
       throws SQLException {
 
     ResultSetMetaData md = resultSet.getMetaData();
-
-    /**
-     * If html template is provided, only fetch the first row.
-     */
-    if (template != null) {
-      resultSet.next();
-      String result = "%html " + template + "\n";
-      for (int i = 1; i <= md.getColumnCount(); ++i) {
-        Object columnObject = resultSet.getObject(i);
-        String columnValue = null;
-        if (columnObject == null) {
-          columnValue = "null";
-        } else {
-          columnValue = resultSet.getString(i);
-        }
-        result = result.replace("{" + (i - 1) + "}", columnValue);
-      }
-      return result;
-    }
-
     StringBuilder msg;
     if (isTableType) {
       msg = new StringBuilder(TABLE_MAGIC_TAG);
@@ -759,11 +738,38 @@ public class JDBCInterpreter extends KerberosInterpreter {
                 resultSet.getMetaData().getColumnCount())) {
               context.out.write("%text Query executed successfully.\n");
             } else {
-              String results = getResults(resultSet,
-                  !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE),
-                      context.getLocalProperties().get("template"));
-              context.out.write(results);
-              context.out.write("\n%text ");
+              String template = context.getLocalProperties().get("template");
+              if (!StringUtils.isBlank(template)) {
+                resultSet.next();
+                ResultSetMetaData md = resultSet.getMetaData();
+                if (isFirstRefreshMap.get(context.getParagraphId())) {
+                  String angularTemplate = template;
+                  for (int j = 0; j < md.getColumnCount(); ++j) {
+                    angularTemplate = angularTemplate.replace("{" + j + "}", 
"{{value_" + j + "}}");
+                  }
+                  context.out.write("%angular " + angularTemplate);
+                  context.out.write("\n%text ");
+                  context.out.flush();
+                  isFirstRefreshMap.put(context.getParagraphId(), false);
+                }
+                for (int j = 1; j <= md.getColumnCount(); ++j) {
+                  Object columnObject = resultSet.getObject(j);
+                  String columnValue = null;
+                  if (columnObject == null) {
+                    columnValue = "null";
+                  } else {
+                    columnValue = resultSet.getString(j);
+                  }
+                  context.getAngularObjectRegistry().add("value_" + (j - 1),
+                          columnValue, context.getNoteId(), 
context.getParagraphId());
+                }
+              } else {
+                String results = getResults(resultSet,
+                        !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE));
+                context.out.write(results);
+                context.out.write("\n%text ");
+                context.out.flush();
+              }
             }
           } else {
             // Response contains either an update count or there are no 
results.
@@ -851,6 +857,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
       paragraphCancelMap.put(context.getParagraphId(), false);
       ScheduledExecutorService refreshExecutor = 
Executors.newSingleThreadScheduledExecutor();
       refreshExecutorServices.put(context.getParagraphId(), refreshExecutor);
+      isFirstRefreshMap.put(context.getParagraphId(), true);
       final AtomicReference<InterpreterResult> interpreterResultRef = new 
AtomicReference();
       refreshExecutor.scheduleAtFixedRate(() -> {
         context.out.clear(false);

Reply via email to