This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 2128166 [ZEPPELIN-5154]. Add option to disable broadcast Paragraph status/progress to frontend 2128166 is described below commit 2128166dc1ece228b8f7174a38e65c6c5c2dcd9c Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Wed Dec 9 14:25:09 2020 +0800 [ZEPPELIN-5154]. Add option to disable broadcast Paragraph status/progress to frontend ### What is this PR for? Broadcast paragraph status/progress will be a very heavy operation if there's many jobs running, so I add one option to disable it. When zeppelin is used as job server instead of interactive notebook, it is not necessary to broadcast paragraph status/progress in real time. ### What type of PR is it? [ Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5154 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3991 from zjffdu/ZEPPELIN-5154 and squashes the following commits: b5f66be2d [Jeff Zhang] [ZEPPELIN-5154]. Add option to disable broadcast Paragraph status/progress via websocket --- .../org/apache/zeppelin/conf/ZeppelinConfiguration.java | 1 + .../java/org/apache/zeppelin/socket/NotebookServer.java | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 0492fac..71198a7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -1057,6 +1057,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_CREDENTIALS_PERSIST("zeppelin.credentials.persist", true), ZEPPELIN_CREDENTIALS_ENCRYPT_KEY("zeppelin.credentials.encryptKey", null), ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "10240000"), + ZEPPELIN_WEBSOCKET_PARAGRAPH_STATUS_PROGRESS("zeppelin.websocket.paragraph_status_progress.enable", true), ZEPPELIN_SERVER_DEFAULT_DIR_ALLOWED("zeppelin.server.default.dir.allowed", false), ZEPPELIN_SERVER_XFRAME_OPTIONS("zeppelin.server.xframe.options", "SAMEORIGIN"), ZEPPELIN_SERVER_JETTY_NAME("zeppelin.server.jetty.name", " "), diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 2c7ece1..c3f0021 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -142,6 +142,8 @@ public class NotebookServer extends WebSocketServlet private static AtomicReference<NotebookServer> self = new AtomicReference<>(); private ExecutorService executorService = Executors.newFixedThreadPool(10); + private boolean sendParagraphStatusToFrontend = ZeppelinConfiguration.create().getBoolean( + ZeppelinConfiguration.ConfVars.ZEPPELIN_WEBSOCKET_PARAGRAPH_STATUS_PROGRESS); private Provider<Notebook> notebookProvider; private Provider<NotebookService> notebookServiceProvider; @@ -1658,6 +1660,9 @@ public class NotebookServer extends WebSocketServlet */ @Override public void onOutputAppend(String noteId, String paragraphId, int index, String output) { + if (!sendParagraphStatusToFrontend) { + return; + } Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", noteId) .put("paragraphId", paragraphId).put("index", index).put("data", output); getConnectionManager().broadcast(noteId, msg); @@ -1671,6 +1676,9 @@ public class NotebookServer extends WebSocketServlet @Override public void onOutputUpdated(String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) { + if (!sendParagraphStatusToFrontend) { + return; + } Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", noteId) .put("paragraphId", paragraphId).put("index", index).put("type", type).put("data", output); try { @@ -1699,6 +1707,10 @@ public class NotebookServer extends WebSocketServlet */ @Override public void onOutputClear(String noteId, String paragraphId) { + if (!sendParagraphStatusToFrontend) { + return; + } + try { final Note note = getNotebook().getNote(noteId); if (note == null) { @@ -1894,6 +1906,9 @@ public class NotebookServer extends WebSocketServlet @Override public void onProgressUpdate(Paragraph p, int progress) { + if (!sendParagraphStatusToFrontend) { + return; + } getConnectionManager().broadcast(p.getNote().getId(), new Message(OP.PROGRESS).put("id", p.getId()).put("progress", progress)); }