This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new dca7b09 [ZEPPELIN-4759] Paragraph refreshing make the other paragraph shaking dca7b09 is described below commit dca7b094633502117bc748979910b8dd895c850b Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Sat Apr 18 23:04:45 2020 +0800 [ZEPPELIN-4759] Paragraph refreshing make the other paragraph shaking ### What is this PR for? This PR just resolve the paragraph shaking issue. Before this PR, I will refresh paragraph output by 2 steps: step 1. clear previous output, step 2. update paragraph with new output. After this PR, I will just make it as just one step: update paragraph with new output. The main change is o method InterpreterOutput#clear. I add one flag to indicate whether it should send the update to frontend to refresh the output. ### What type of PR is it? [ Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4759 ### How should this be tested? * CI pass ### Screenshots (if appropriate) * Before  * After  ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3741 from zjffdu/ZEPPELIN-4759 and squashes the following commits: b8787df24 [Jeff Zhang] [ZEPPELIN-4759]. Paragraph refreshing make the other paragraph shaking (cherry picked from commit ad338074287d29689e6b892b2e4cc45df3f3c69e) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../main/java/org/apache/zeppelin/flink/JobManager.java | 4 ++-- .../org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java | 2 +- .../apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java | 2 +- .../org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java | 2 +- .../org/apache/zeppelin/interpreter/InterpreterOutput.java | 14 ++++++++++++-- .../interpreter/InterpreterResultMessageOutput.java | 10 +++++++++- 6 files changed, 26 insertions(+), 8 deletions(-) diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java index bb044c3..3fdb825 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -198,10 +198,10 @@ public class JobManager { if (isStreamingInsertInto) { StringBuilder builder = new StringBuilder("%html "); builder.append("<h1>Duration: " + - Integer.parseInt(rootNode.getObject().getString("duration")) / 1000 + + rootNode.getObject().getLong("duration") / 1000 + " seconds"); builder.append("\n%text "); - context.out.clear(); + context.out.clear(false); sendFlinkJobUrl(context); context.out.write(builder.toString()); context.out.flush(); diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java index fea0841..83a95c8 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java @@ -122,7 +122,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob { @Override protected void refresh(InterpreterContext context) { - context.out().clear(); + context.out().clear(false); try { jobManager.sendFlinkJobUrl(context); String result = buildResult(); diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java index 8dd2bec..7ef274a 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java @@ -80,7 +80,7 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob { LOGGER.warn("Skip RefreshTask as no data available"); return; } - context.out().clear(); + context.out().clear(false); String output = buildResult(); context.out.write(output); jobManager.sendFlinkJobUrl(context); diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java index 9c69616..a9cc3c0 100644 --- a/flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java +++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java @@ -104,7 +104,7 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob { @Override protected void refresh(InterpreterContext context) { - context.out().clear(); + context.out().clear(false); try { jobManager.sendFlinkJobUrl(context); String result = buildResult(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java index a9d9243..ef1aafb 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java @@ -139,6 +139,14 @@ public class InterpreterOutput extends OutputStream { } public void clear() { + clear(true); + } + + /** + * + * @param sendUpdateToFrontend Whether send empty result to frontend to clear the paragraph output + */ + public void clear(boolean sendUpdateToFrontend) { size = 0; lastCRIndex = -1; truncated = false; @@ -146,7 +154,7 @@ public class InterpreterOutput extends OutputStream { synchronized (resultMessageOutputs) { for (InterpreterResultMessageOutput out : resultMessageOutputs) { - out.clear(); + out.clear(sendUpdateToFrontend); try { out.close(); } catch (IOException e) { @@ -159,7 +167,9 @@ public class InterpreterOutput extends OutputStream { currentOut = null; startOfTheNewLine = true; firstCharIsPercentSign = false; - updateAllResultMessages(); + if (sendUpdateToFrontend) { + updateAllResultMessages(); + } } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java index 8758c98..85f476d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResultMessageOutput.java @@ -74,6 +74,14 @@ public class InterpreterResultMessageOutput extends OutputStream { } public void clear() { + clear(true); + } + + /** + * + * @param sendUpdateToFrontend Whether send empty result to frontend to clear the paragraph output + */ + public void clear(boolean sendUpdateToFrontend) { synchronized (outList) { buffer.reset(); outList.clear(); @@ -81,7 +89,7 @@ public class InterpreterResultMessageOutput extends OutputStream { watcher.clear(); } - if (flushListener != null) { + if (flushListener != null && sendUpdateToFrontend) { flushListener.onUpdate(this); } }