This is an automated email from the ASF dual-hosted git repository. moon pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 1fa5a20 [ZEPPELIN-4985] Instant visual feedback on high latency network environment 1fa5a20 is described below commit 1fa5a2032aec1c8396c4e5d436ec4ee517dea48e Author: Lee moon soo <leemoon...@gmail.com> AuthorDate: Thu Aug 6 14:07:22 2020 -0700 [ZEPPELIN-4985] Instant visual feedback on high latency network environment ### What is this PR for? This PR provides an instant visual feedback to improve user experience on high latency network environment. It focuses on improving 2 frequent user action - Run paragraph(s) - Swich visualization In high latency network, the user may feel a short 'freeze' between action (click) and any visual change in notebook. While the paragraph is updated after get a response from server. After this change, the paragraph will get immediate update after a user action, without waiting for server response. #### Before  - Some delay to become PENDING status after click run - Some delay after switch visualization (screenrecord is not long enough to see actual change after delay) #### After  - Instant status update to PENDING after click run - Instant visualization switch ### What type of PR is it? Improvement ### Todos * [x] - Instant visual feedback on paragraph run * [x] - Instant visual feedback on visualization switch * [x] - Instant visual feedback on paragraph run (new ui) * [x] - Instant visual feedback on visualization switch (new ui) ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-4985 ### How should this be tested? Can simulate high latency network by adding sleep in NotebookSocket.send() method ``` --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java -67,6 +67,9 public class NotebookSocket extends WebSocketAdapter { } public synchronized void send(String serializeMessage) throws IOException { + try { + Thread.sleep(1000); + } catch (InterruptedException e) {} connection.getRemote().sendString(serializeMessage); } ``` Screen recordings attached are made in this way. ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Lee moon soo <leemoon...@gmail.com> Closes #3873 from Leemoonsoo/ZEPPELIN-4985 and squashes the following commits: b064cf414 [Lee moon soo] update test c44c1002f [Lee moon soo] implement paragraph update short circuit in new ui 5952cf77c [Lee moon soo] instant update on switching visualization c38002972 [Lee moon soo] try implement short circuit logic in new ui 97c832e0f [Lee moon soo] skip update paragraph to READY when submit 00135e149 [Lee moon soo] short circuit status update on paragraph run (cherry picked from commit 1d01972c0318fae43321ee74b2cb3f224f7d61fb) Signed-off-by: Lee moon soo <m...@apache.org> --- .../zeppelin/integration/ParagraphActionsIT.java | 2 +- .../zeppelin/cluster/event/ClusterMessage.java | 9 ++++ .../org/apache/zeppelin/rest/NotebookRestApi.java | 4 +- .../apache/zeppelin/socket/ConnectionManager.java | 4 +- .../org/apache/zeppelin/socket/NotebookServer.java | 53 ++++++++++---------- .../projects/zeppelin-sdk/package.json | 2 +- .../interfaces/message-data-type-map.interface.ts | 2 + .../src/interfaces/message-operator.interface.ts | 8 ++++ .../src/interfaces/message-paragraph.interface.ts | 5 ++ .../src/interfaces/websocket-message.interface.ts | 1 + .../projects/zeppelin-sdk/src/message.ts | 56 ++++++++++++++++++---- .../src/app/core/paragraph-base/paragraph-base.ts | 8 ++++ .../app/notebook/paragraph/paragraph.controller.js | 7 +++ .../notebook/paragraph/result/result.controller.js | 17 +++---- .../websocket/websocket-event.factory.js | 18 ++++++- .../websocket/websocket-message.service.js | 16 +++++++ .../org/apache/zeppelin/notebook/Paragraph.java | 1 - .../apache/zeppelin/notebook/socket/Message.java | 12 +++++ 18 files changed, 178 insertions(+), 47 deletions(-) diff --git a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java index 13b96ae..c6a11eb 100644 --- a/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java +++ b/zeppelin-integration/src/test/java/org/apache/zeppelin/integration/ParagraphActionsIT.java @@ -226,7 +226,7 @@ public class ParagraphActionsIT extends AbstractZeppelinIT { ZeppelinITUtils.sleep(2000, false); collector.checkThat("Paragraph status is ", - getParagraphStatus(1), CoreMatchers.equalTo("READY") + getParagraphStatus(1), CoreMatchers.equalTo("PENDING") ); driver.navigate().refresh(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java index cd999c4..24e2f22 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/event/ClusterMessage.java @@ -26,6 +26,7 @@ import java.util.Map; public class ClusterMessage { public ClusterEvent clusterEvent; private Map<String, String> data = new HashMap<>(); + private String msgId; private static Gson gson = new GsonBuilder() .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ") @@ -54,6 +55,14 @@ public class ClusterMessage { return data; } + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + public static ClusterMessage deserializeMessage(String msg) { return gson.fromJson(msg, ClusterMessage.class); } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index e6dcfee..52f4acf 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -74,6 +74,8 @@ import org.quartz.CronExpression; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.zeppelin.notebook.socket.Message.MSG_ID_NOT_DEFINED; + /** * Rest api endpoint for the notebook. */ @@ -574,7 +576,7 @@ public class NotebookRestApi extends AbstractRestApi { AuthenticationInfo subject = new AuthenticationInfo(user); notebook.saveNote(note, subject); - notebookServer.broadcastParagraph(note, p); + notebookServer.broadcastParagraph(note, p, MSG_ID_NOT_DEFINED); return new JsonResponse<>(Status.OK, "").build(); } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java index d44c656..a36f895 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java @@ -343,7 +343,7 @@ public class ConnectionManager { broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m); } - public void unicastParagraph(Note note, Paragraph p, String user) { + public void unicastParagraph(Note note, Paragraph p, String user, String msgId) { if (!note.isPersonalizedMode() || p == null || user == null) { return; } @@ -354,7 +354,7 @@ public class ConnectionManager { } for (NotebookSocket conn : userSocketMap.get(user)) { - Message m = new Message(Message.OP.PARAGRAPH).put("paragraph", p); + Message m = new Message(Message.OP.PARAGRAPH).withMsgId(msgId).put("paragraph", p); unicast(m, conn); } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 41f10f8..2db1d26 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -96,6 +96,8 @@ import org.glassfish.hk2.api.ServiceLocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.zeppelin.notebook.socket.Message.MSG_ID_NOT_DEFINED; + /** * Zeppelin websocket service. This class used setter injection because all servlet should have * no-parameter constructor @@ -578,7 +580,7 @@ public class NotebookServer extends WebSocketServlet public void broadcastNote(Note note) { inlineBroadcastNote(note); - broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE, note); + broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE, MSG_ID_NOT_DEFINED, note); } private void inlineBroadcastNote(Note note) { @@ -586,36 +588,38 @@ public class NotebookServer extends WebSocketServlet getConnectionManager().broadcast(note.getId(), message); } - private void inlineBroadcastParagraph(Note note, Paragraph p) { + private void inlineBroadcastParagraph(Note note, Paragraph p, String msgId) { broadcastNoteForms(note); if (note.isPersonalizedMode()) { - broadcastParagraphs(p.getUserParagraphMap(), p); + broadcastParagraphs(p.getUserParagraphMap(), p, msgId); } else { - Message message = new Message(OP.PARAGRAPH).put("paragraph", p); + Message message = new Message(OP.PARAGRAPH).withMsgId(msgId).put("paragraph", p); getConnectionManager().broadcast(note.getId(), message); } } - public void broadcastParagraph(Note note, Paragraph p) { - inlineBroadcastParagraph(note, p); - broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPH, note, p); + public void broadcastParagraph(Note note, Paragraph p, String msgId) { + inlineBroadcastParagraph(note, p, msgId); + broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPH, msgId, note, p); } private void inlineBroadcastParagraphs(Map<String, Paragraph> userParagraphMap, - Paragraph defaultParagraph) { + Paragraph defaultParagraph, + String msgId) { if (null != userParagraphMap) { for (String user : userParagraphMap.keySet()) { - Message message = new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user)); + Message message = new Message(OP.PARAGRAPH).withMsgId(msgId).put("paragraph", userParagraphMap.get(user)); getConnectionManager().multicastToUser(user, message); } } } private void broadcastParagraphs(Map<String, Paragraph> userParagraphMap, - Paragraph defaultParagraph) { - inlineBroadcastParagraphs(userParagraphMap, defaultParagraph); - broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPHS, userParagraphMap, defaultParagraph); + Paragraph defaultParagraph, + String msgId) { + inlineBroadcastParagraphs(userParagraphMap, defaultParagraph, msgId); + broadcastClusterEvent(ClusterEvent.BROADCAST_PARAGRAPHS, msgId, userParagraphMap, defaultParagraph); } private void inlineBroadcastNewParagraph(Note note, Paragraph para) { @@ -628,7 +632,7 @@ public class NotebookServer extends WebSocketServlet private void broadcastNewParagraph(Note note, Paragraph para) { inlineBroadcastNewParagraph(note, para); - broadcastClusterEvent(ClusterEvent.BROADCAST_NEW_PARAGRAPH, note, para); + broadcastClusterEvent(ClusterEvent.BROADCAST_NEW_PARAGRAPH, MSG_ID_NOT_DEFINED, note, para); } public void inlineBroadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) { @@ -647,17 +651,18 @@ public class NotebookServer extends WebSocketServlet public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) { inlineBroadcastNoteList(subject, userAndRoles); - broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE_LIST, subject, userAndRoles); + broadcastClusterEvent(ClusterEvent.BROADCAST_NOTE_LIST, MSG_ID_NOT_DEFINED, subject, userAndRoles); } // broadcast ClusterEvent - private void broadcastClusterEvent(ClusterEvent event, Object... objects) { + private void broadcastClusterEvent(ClusterEvent event, String msgId, Object... objects) { ZeppelinConfiguration conf = ZeppelinConfiguration.create(); if (!conf.isClusterMode()) { return; } ClusterMessage clusterMessage = new ClusterMessage(event); + clusterMessage.setMsgId(msgId); for(Object object : objects) { String json = ""; @@ -739,10 +744,10 @@ public class NotebookServer extends WebSocketServlet } break; case BROADCAST_PARAGRAPH: - inlineBroadcastParagraph(note, paragraph); + inlineBroadcastParagraph(note, paragraph, message.getMsgId()); break; case BROADCAST_PARAGRAPHS: - inlineBroadcastParagraphs(userParagraphMap, paragraph); + inlineBroadcastParagraphs(userParagraphMap, paragraph, message.getMsgId()); break; case BROADCAST_NEW_PARAGRAPH: inlineBroadcastNewParagraph(note, paragraph); @@ -1113,9 +1118,9 @@ public class NotebookServer extends WebSocketServlet if (p.getNote().isPersonalizedMode()) { Map<String, Paragraph> userParagraphMap = p.getNote().getParagraph(paragraphId).getUserParagraphMap(); - broadcastParagraphs(userParagraphMap, p); + broadcastParagraphs(userParagraphMap, p, fromMessage.msgId); } else { - broadcastParagraph(p.getNote(), p); + broadcastParagraph(p.getNote(), p, fromMessage.msgId); } } }); @@ -1253,9 +1258,9 @@ public class NotebookServer extends WebSocketServlet public void onSuccess(Paragraph p, ServiceContext context) throws IOException { super.onSuccess(p, context); if (p.getNote().isPersonalizedMode()) { - getConnectionManager().unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser()); + getConnectionManager().unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser(), fromMessage.msgId); } else { - broadcastParagraph(p.getNote(), p); + broadcastParagraph(p.getNote(), p, fromMessage.msgId); } } }); @@ -1520,7 +1525,7 @@ public class NotebookServer extends WebSocketServlet if (p.getNote().isPersonalizedMode()) { Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId, context.getAutheInfo().getUser()); - getConnectionManager().unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser()); + getConnectionManager().unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser(), fromMessage.msgId); } // if it's the last paragraph and not empty, let's add a new one @@ -1698,7 +1703,7 @@ public class NotebookServer extends WebSocketServlet } else { note.clearParagraphOutput(paragraphId); Paragraph paragraph = note.getParagraph(paragraphId); - broadcastParagraph(note, paragraph); + broadcastParagraph(note, paragraph, MSG_ID_NOT_DEFINED); } } catch (IOException e) { LOG.warn("Fail to call onOutputClear", e); @@ -1920,7 +1925,7 @@ public class NotebookServer extends WebSocketServlet } p.setStatusToUserParagraph(p.getStatus()); - broadcastParagraph(p.getNote(), p); + broadcastParagraph(p.getNote(), p, MSG_ID_NOT_DEFINED); try { broadcastUpdateNoteJobInfo(p.getNote(), System.currentTimeMillis() - 5000); } catch (IOException e) { diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/package.json b/zeppelin-web-angular/projects/zeppelin-sdk/package.json index 9be3b66..4bb41ea 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/package.json +++ b/zeppelin-web-angular/projects/zeppelin-sdk/package.json @@ -5,4 +5,4 @@ "@angular/common": "^8.2.9", "@angular/core": "^8.2.9" } -} \ No newline at end of file +} diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-data-type-map.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-data-type-map.interface.ts index ddf934e..fa8bddf 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-data-type-map.interface.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-data-type-map.interface.ts @@ -64,6 +64,7 @@ import { ParagraphClearOutput, ParagraphRemove, ParagraphRemoved, + ParagraphStatus, ParasInfo, PatchParagraphReceived, PatchParagraphSend, @@ -102,6 +103,7 @@ export interface MessageReceiveDataTypeMap { [OP.PARAGRAPH_REMOVED]: ParagraphRemoved; [OP.EDITOR_SETTING]: EditorSettingReceived; [OP.PROGRESS]: Progress; + [OP.PARAGRAPH_STATUS]: ParagraphStatus; [OP.PARAGRAPH_MOVED]: ParagraphMoved; [OP.AUTH_INFO]: AuthInfo; [OP.NOTE_UPDATED]: NoteUpdated; diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-operator.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-operator.interface.ts index d3ce82b..1019330 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-operator.interface.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-operator.interface.ts @@ -51,6 +51,14 @@ export enum OP { PROGRESS = 'PROGRESS', /** + * [short circuit] + * paragraph status update + * @param id paragraph id + * @param progress percentage progress + */ + PARAGRAPH_STATUS = 'PARAGRAPH_STATUS', + + /** * [c-s] * create new notebook */ diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts index 2e04b22..1ec0cfb 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts @@ -299,6 +299,11 @@ export interface Progress { progress: number; } +export interface ParagraphStatus { + id: string; + status: string; +} + interface GraphConfigSetting { table?: VisualizationTable; lineChart?: VisualizationLineChart; diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/websocket-message.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/websocket-message.interface.ts index bdc71e1..fcfd896 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/websocket-message.interface.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/websocket-message.interface.ts @@ -18,4 +18,5 @@ export interface WebSocketMessage<K extends keyof MixMessageDataTypeMap> { ticket?: string; // default 'anonymous' principal?: string; // default 'anonymous' roles?: string; // default '[]' + msgId?: string; } diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts index 4505e36..921876a 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts @@ -10,20 +10,20 @@ * limitations under the License. */ -import { interval, Observable, Subject, Subscription } from 'rxjs'; -import { delay, filter, map, mergeMap, retryWhen, take } from 'rxjs/operators'; -import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; +import {interval, Observable, Subject, Subscription} from 'rxjs'; +import {delay, filter, map, mergeMap, retryWhen, take} from 'rxjs/operators'; +import {webSocket, WebSocketSubject} from 'rxjs/webSocket'; -import { Ticket } from './interfaces/message-common.interface'; +import {Ticket} from './interfaces/message-common.interface'; import { MessageReceiveDataTypeMap, MessageSendDataTypeMap, MixMessageDataTypeMap } from './interfaces/message-data-type-map.interface'; -import { NoteConfig, PersonalizedMode, SendNote } from './interfaces/message-notebook.interface'; -import { OP } from './interfaces/message-operator.interface'; -import { ParagraphConfig, ParagraphParams, SendParagraph } from './interfaces/message-paragraph.interface'; -import { WebSocketMessage } from './interfaces/websocket-message.interface'; +import {NoteConfig, PersonalizedMode, SendNote} from './interfaces/message-notebook.interface'; +import {OP} from './interfaces/message-operator.interface'; +import {ParagraphConfig, ParagraphParams, SendParagraph} from './interfaces/message-paragraph.interface'; +import {WebSocketMessage} from './interfaces/websocket-message.interface'; export type ArgumentsType<T> = T extends (...args: infer U) => void ? U : never; @@ -46,6 +46,8 @@ export class Message { private pingIntervalSubscription = new Subscription(); private wsUrl: string; private ticket: Ticket; + private uniqueClientId = Math.random().toString(36).substring(2, 7); + private lastMsgIdSeqSent = 0; constructor() { this.open$.subscribe(() => { @@ -140,6 +142,7 @@ export class Message { const [op, data] = args; const message: WebSocketMessage<K> = { op, + msgId: `${this.uniqueClientId}-${++this.lastMsgIdSeqSent}`, data: data as MixMessageDataTypeMap[K], ...this.ticket }; @@ -152,10 +155,37 @@ export class Message { receive<K extends keyof MessageReceiveDataTypeMap>(op: K): Observable<Record<K, MessageReceiveDataTypeMap[K]>[K]> { return this.received$.pipe( filter(message => message.op === op), + filter(message => { + if (!message.msgId) { + // when msgId is not specified, it is not response to client request. + // always process them + return true; + } + const uniqueClientId = message.msgId.split('-')[0]; + const msgIdSeqReceived = parseInt(message.msgId.split('-')[1], 10); + const isResponseForRequestFromThisClient = uniqueClientId === this.uniqueClientId; + + if (message.op === OP.PARAGRAPH) { + if (isResponseForRequestFromThisClient && + this.lastMsgIdSeqSent > msgIdSeqReceived + ) { + console.log('PARAPGRAPH is already updated by shortcircuit'); + return false; + } else { + return true; + } + } else { + return true; + } + }), map(message => message.data) ) as Observable<Record<K, MessageReceiveDataTypeMap[K]>[K]>; } + shortCircuit(message: WebSocketMessage<keyof MessageReceiveDataTypeMap>) { + this.received$.next(this.interceptReceived(message)); + } + destroy(): void { this.ws.complete(); this.ws = null; @@ -352,6 +382,16 @@ export class Message { paragraphConfig: ParagraphConfig, paragraphParams: ParagraphParams ): void { + // short circuit update status without waiting for server response + this.shortCircuit({ + op: OP.PARAGRAPH_STATUS, + data: { + id: paragraphId, + status: "PENDING" + } + }) + + // send message to server this.send<OP.RUN_PARAGRAPH>(OP.RUN_PARAGRAPH, { id: paragraphId, title: paragraphTitle, diff --git a/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts b/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts index f8f9a1b..7fcb35c 100644 --- a/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts +++ b/zeppelin-web-angular/src/app/core/paragraph-base/paragraph-base.ts @@ -69,6 +69,14 @@ export abstract class ParagraphBase extends MessageListenersManager { } } + @MessageListener(OP.PARAGRAPH_STATUS) + onParagraphStatus(data: MessageReceiveDataTypeMap[OP.PARAGRAPH_STATUS]) { + if (data.id === this.paragraph.id) { + this.paragraph.status = data.status; + this.cdr.markForCheck(); + } + } + @MessageListener(OP.NOTE_RUNNING_STATUS) noteRunningStatusChange(data: MessageReceiveDataTypeMap[OP.NOTE_RUNNING_STATUS]) { this.isEntireNoteRunning = data.status; diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index 6ccff79..21efb20 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -459,6 +459,7 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat if (!paragraphText || $scope.isRunning($scope.paragraph)) { return; } + const magic = SpellResult.extractMagic(paragraphText); if (heliumService.getSpellByMagic(magic)) { @@ -1632,6 +1633,12 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat } }); + $scope.$on('updateStatus', function(event, data) { + if (data.id === $scope.paragraph.id) { + $scope.paragraph.status = data.status; + } + }); + $scope.$on('appendParagraphOutput', function(event, data) { if (data.paragraphId === $scope.paragraph.id) { if (!$scope.paragraph.results) { diff --git a/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js b/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js index 4aabc5c..ec74cb9 100644 --- a/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/result/result.controller.js @@ -762,14 +762,15 @@ function ResultCtrl($scope, $rootScope, $route, $window, $routeParams, $location let newParagraphConfig = angular.copy(paragraph.config); newParagraphConfig.results = newParagraphConfig.results || []; newParagraphConfig.results[resultIndex] = config; - if ($scope.revisionView === true) { - // local update without commit - updateData({ - type: $scope.type, - data: data, - }, newParagraphConfig.results[resultIndex], paragraph, resultIndex); - renderResult($scope.type, true); - } else { + + // local update without commit + updateData({ + type: $scope.type, + data: data, + }, newParagraphConfig.results[resultIndex], paragraph, resultIndex); + renderResult($scope.type, true); + + if ($scope.revisionView !== true) { if (! $scope.viewOnly) { return websocketMsgSrv.commitParagraph(paragraph.id, title, text, newParagraphConfig, params); } diff --git a/zeppelin-web/src/components/websocket/websocket-event.factory.js b/zeppelin-web/src/components/websocket/websocket-event.factory.js index f1b8cd6..8cbf34c 100644 --- a/zeppelin-web/src/components/websocket/websocket-event.factory.js +++ b/zeppelin-web/src/components/websocket/websocket-event.factory.js @@ -19,6 +19,8 @@ function WebsocketEventFactory($rootScope, $websocket, $location, baseUrlSrv, sa let websocketCalls = {}; let pingIntervalId; + const uniqueClientId = Math.random().toString(36).substring(2, 7); + let lastMsgIdSeqSent = 0; websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketUrl()); websocketCalls.ws.reconnectIfNotNormalClose = true; @@ -41,6 +43,8 @@ function WebsocketEventFactory($rootScope, $websocket, $location, baseUrlSrv, sa data.ticket = ''; data.roles = ''; } + + data.msgId = uniqueClientId + '-' + ++lastMsgIdSeqSent; console.log('Send >> %o, %o, %o, %o, %o', data.op, data.principal, data.ticket, data.roles, data); return websocketCalls.ws.send(JSON.stringify(data)); }; @@ -59,6 +63,11 @@ function WebsocketEventFactory($rootScope, $websocket, $location, baseUrlSrv, sa let op = payload.op; let data = payload.data; + let msgId = payload.msgId; + const uniqueClientId = msgId ? msgId.split('-')[0] : undefined; + const msgIdSeqReceived = msgId ? parseInt(msgId.split('-')[1]) : undefined; + const isResponseForRequestFromThisClient = uniqueClientId === uniqueClientId; + if (op === 'NOTE') { $rootScope.$broadcast('setNoteContent', data.note); } else if (op === 'NEW_NOTE') { @@ -111,7 +120,14 @@ function WebsocketEventFactory($rootScope, $websocket, $location, baseUrlSrv, sa buttons: btn, }); } else if (op === 'PARAGRAPH') { - $rootScope.$broadcast('updateParagraph', data); + if (isResponseForRequestFromThisClient && + lastMsgIdSeqSent > msgIdSeqReceived + ) { + // paragraph is already updated by short circuit. + console.log('PARAPGRAPH is already updated by shortcircuit'); + } else { + $rootScope.$broadcast('updateParagraph', data); + } } else if (op === 'PATCH_PARAGRAPH') { $rootScope.$broadcast('patchReceived', data); } else if (op === 'COLLABORATIVE_MODE_STATUS') { diff --git a/zeppelin-web/src/components/websocket/websocket-message.service.js b/zeppelin-web/src/components/websocket/websocket-message.service.js index a959070..ee2f773 100644 --- a/zeppelin-web/src/components/websocket/websocket-message.service.js +++ b/zeppelin-web/src/components/websocket/websocket-message.service.js @@ -188,6 +188,13 @@ function WebsocketMessageService($rootScope, websocketEvents) { }, runParagraph: function(paragraphId, paragraphTitle, paragraphData, paragraphConfig, paragraphParams) { + // short circuit update paragraph status for immediate visual feedback without waiting for server response + $rootScope.$broadcast('updateStatus', { + id: paragraphId, + status: 'PENDING', + }); + + // send message to server websocketEvents.sendNewEvent({ op: 'RUN_PARAGRAPH', data: { @@ -201,6 +208,15 @@ function WebsocketMessageService($rootScope, websocketEvents) { }, runAllParagraphs: function(noteId, paragraphs) { + // short circuit update paragraph status for immediate visual feedback without waiting for server response + paragraphs.forEach((p) => { + $rootScope.$broadcast('updateStatus', { + id: p.id, + status: 'PENDING', + }); + }); + + // send message to server websocketEvents.sendNewEvent({ op: 'RUN_ALL_PARAGRAPHS', data: { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index ce80523..336be44 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -338,7 +338,6 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen setStatus(Job.Status.FINISHED); return true; } - setStatus(Status.READY); if (getConfig().get("enabled") == null || (Boolean) getConfig().get("enabled")) { setAuthenticationInfo(getAuthenticationInfo()); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java index 2b9f339..9d6803e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java @@ -225,10 +225,22 @@ public class Message implements JsonSerializable { public String principal = "anonymous"; public String roles = ""; + // Unique id generated from client side. to identify message. + // When message from server is response to the client request + // includes the msgId in response message, client can pair request and response message. + // When server send message that is not response to the client request, set null; + public String msgId = MSG_ID_NOT_DEFINED; + public static String MSG_ID_NOT_DEFINED = null; + public Message(OP op) { this.op = op; } + public Message withMsgId(String msgId) { + this.msgId = msgId; + return this; + } + public Message put(String k, Object v) { data.put(k, v); return this;