This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5565-564ccdbfd51089f31dcfdcbdba9f5a872a2f8256 in repository https://gitbox.apache.org/repos/asf/texera.git
commit ad10b7f2b3c8110ce36379e2f538a8f6554fb494 Author: Kunwoo (Chris) <[email protected]> AuthorDate: Tue Jun 9 04:15:13 2026 -0700 fix(frontend): clean up websocket state when returning to the dashboard (#5565) ### What changes were proposed in this PR? Websocket-derived front-end state (the connection itself, plus the execution status, console output, and results built from its events) lives in singletons outside the workspace. It was never torn down in two cases, so stale state carried over: 1. **Returning to the dashboard** and re-entering a workflow reused the previous socket. The connection-tracking fields (`currentConnectedWid` / `currentConnectedCuid`) also survived, so the reconnect guard saw them unchanged, skipped reconnecting, and reused the stale socket. (#3120 — the case #3093 did not cover.) 2. **Switching computing units** inside the workspace left the previous unit's console, results, and execution status on screen. This PR clears that state at both points. **Workspace exit**: `WorkspaceComponent.ngOnDestroy()` now tears everything down: | Call *(new)* | Resets | | --- | --- | | `ComputingUnitStatusService.disconnect()` | closes the socket, clears operator status, stops the unit poll, resets the connection-tracking fields and the selected unit | | `ExecuteWorkflowService.resetExecutionAndWorkers()` | execution status and worker assignments | | `WorkflowConsoleService.clearConsoleMessages()` | console output | | `WorkflowResultService.clearResults()` | result caches and table stats | **Unit switch**: `ComputingUnitStatusService` emits a reset signal when it reconnects to a different unit, and `WorkspaceComponent` clears the same execution / console / result state in response. As a result, switching units now discards the previous unit's results and console instead of leaving them on screen. The remaining websocket-event consumers need no teardown: `OperatorReuseCacheStatusService` is stateless, and `udf-debug.service`'s state lives in the `TexeraGraph`, already reset by `clearWorkflow()`. ### Any related issues, documentation, discussions? Closes #3120. Related: #3093 (earlier partial fix for the in-canvas socket re-open). ### How was this PR tested? Test with this workflow [Untitled workflow (14).json](https://github.com/user-attachments/files/28696700/Untitled.workflow.14.json) https://github.com/user-attachments/assets/060fe1ac-39cf-45e5-b423-5aa27fe17aed ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.8) --------- Co-authored-by: Xinyuan Lin <[email protected]> --- .../computing-unit-status.service.spec.ts | 146 +++++++++++++++++++++ .../computing-unit-status.service.ts | 34 +++++ .../result-panel/result-panel.component.spec.ts | 21 +++ .../result-panel/result-panel.component.ts | 17 +++ .../component/workspace.component.spec.ts | 41 +++++- .../app/workspace/component/workspace.component.ts | 28 +++- .../execute-workflow.service.spec.ts | 15 +++ .../execute-workflow/execute-workflow.service.ts | 10 ++ .../workflow-console.service.spec.ts | 13 ++ .../workflow-console/workflow-console.service.ts | 9 ++ .../workflow-result.service.spec.ts | 27 ++++ .../workflow-result/workflow-result.service.ts | 22 ++++ .../workflow-websocket.service.spec.ts | 8 ++ .../workflow-websocket.service.ts | 2 + 14 files changed, 391 insertions(+), 2 deletions(-) diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts new file mode 100644 index 0000000000..d9c3319cf3 --- /dev/null +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { TestBed } from "@angular/core/testing"; +import { HttpClientTestingModule } from "@angular/common/http/testing"; +import { of } from "rxjs"; +import { ComputingUnitStatusService } from "./computing-unit-status.service"; +import { WorkflowComputingUnitManagingService } from "../workflow-computing-unit/workflow-computing-unit-managing.service"; +import { WorkflowWebsocketService } from "../../../../workspace/service/workflow-websocket/workflow-websocket.service"; +import { WorkflowStatusService } from "../../../../workspace/service/workflow-status/workflow-status.service"; +import { UserService } from "../../user/user.service"; +import { StubUserService } from "../../user/stub-user.service"; +import { AuthService } from "../../user/auth.service"; +import { StubAuthService } from "../../user/stub-auth.service"; +import { DashboardWorkflowComputingUnit } from "../../../type/workflow-computing-unit"; +import { commonTestProviders } from "../../../testing/test-utils"; + +describe("ComputingUnitStatusService", () => { + let service: ComputingUnitStatusService; + let websocketService: WorkflowWebsocketService; + + const mockUnit = (cuid: number) => ({ computingUnit: { cuid } }) as unknown as DashboardWorkflowComputingUnit; + + beforeEach(() => { + const managingStub = { + listComputingUnits: () => of([]), + getComputingUnit: (cuid: number) => of(mockUnit(cuid)), + terminateComputingUnit: () => of(undefined), + }; + + TestBed.configureTestingModule({ + imports: [HttpClientTestingModule], + providers: [ + ComputingUnitStatusService, + WorkflowWebsocketService, + WorkflowStatusService, + { provide: WorkflowComputingUnitManagingService, useValue: managingStub }, + { provide: UserService, useClass: StubUserService }, + { provide: AuthService, useClass: StubAuthService }, + ...commonTestProviders, + ], + }); + + service = TestBed.inject(ComputingUnitStatusService); + websocketService = TestBed.inject(WorkflowWebsocketService); + }); + + afterEach(() => { + // tear down the interval poll started by selectComputingUnit() so it can't outlive the test + service.ngOnDestroy(); + }); + + it("should be created", () => { + expect(service).toBeTruthy(); + }); + + it("reconnects when re-selecting the same workflow after disconnect (regression #3120)", () => { + const openSpy = vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {}); + const closeSpy = vi.spyOn(websocketService, "closeWebsocket"); + (service as any).allComputingUnitsSubject.next([mockUnit(7)]); + + // Enter workflow 5 on computing unit 7 → opens the websocket once. + service.selectComputingUnit(5, 7); + expect(openSpy).toHaveBeenCalledTimes(1); + + // User returns to the dashboard. + service.disconnect(); + expect(closeSpy).toHaveBeenCalled(); + + // Re-enter the SAME workflow (the `wid -> null -> wid` pattern): without the + // cleanup, the retained currentConnectedWid/Cuid would suppress the reconnect. + service.selectComputingUnit(5, 7); + expect(openSpy).toHaveBeenCalledTimes(2); + }); + + it("disconnect() clears the selected computing unit", () => { + vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {}); + (service as any).allComputingUnitsSubject.next([mockUnit(7)]); + service.selectComputingUnit(5, 7); + + let latest: DashboardWorkflowComputingUnit | null = mockUnit(7); + service.getSelectedComputingUnit().subscribe(unit => (latest = unit)); + expect(latest).not.toBeNull(); + + service.disconnect(); + expect(latest).toBeNull(); + }); + + it("emits a connection-reset signal when switching to a different computing unit (issue #3120)", () => { + let connected = false; + vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => { + connected = true; + }); + vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => { + connected = false; + }); + vi.spyOn(websocketService, "isConnected", "get").mockImplementation(() => connected); + (service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]); + + let resetCount = 0; + service.getConnectionResetStream().subscribe(() => resetCount++); + + // First connection on unit 7: nothing to tear down yet → no signal. + service.selectComputingUnit(5, 7); + expect(resetCount).toBe(0); + + // Switch to a different unit while connected → tear-down signal fires once. + service.selectComputingUnit(5, 8); + expect(resetCount).toBe(1); + }); + + it("emits a connection-reset signal when switching units even if the socket already dropped (issue #3120)", () => { + vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {}); + vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => {}); + // socket reports disconnected throughout, e.g. the previous unit was terminated + vi.spyOn(websocketService, "isConnected", "get").mockReturnValue(false); + (service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]); + + let resetCount = 0; + service.getConnectionResetStream().subscribe(() => resetCount++); + + // First connection on unit 7: nothing to tear down yet → no signal. + service.selectComputingUnit(5, 7); + expect(resetCount).toBe(0); + + // Switch units while disconnected: unit 7's stale state must still be cleared. + service.selectComputingUnit(5, 8); + expect(resetCount).toBe(1); + }); +}); diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts index 831263183b..e5b6c44388 100644 --- a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts @@ -47,6 +47,10 @@ export class ComputingUnitStatusService implements OnDestroy { private readonly refreshComputingUnitListSignal = new Subject<void>(); + // Emits when the active connection is torn down to switch computing units, so + // session consumers can clear their websocket-derived state. + private readonly connectionResetSubject = new Subject<void>(); + // Refresh interval in milliseconds private readonly REFRESH_INTERVAL_MS = 2000; private refreshSubscription: Subscription | null = null; @@ -158,9 +162,16 @@ export class ComputingUnitStatusService implements OnDestroy { // open websocket if needed const shouldReconnect = this.currentConnectedCuid !== cuid || this.currentConnectedWid !== wid; if (isDefined(wid) && shouldReconnect) { + // Tear down stale state on switch even if the socket already dropped + // (e.g. the prior unit was terminated), not just while still connected. + const hadPreviousConnection = isDefined(this.currentConnectedWid) || isDefined(this.currentConnectedCuid); if (this.workflowWebsocketService.isConnected) { this.workflowWebsocketService.closeWebsocket(); + } + if (hadPreviousConnection) { this.workflowStatusService.clearStatus(); + // switching units: signal consumers to clear their stale state + this.connectionResetSubject.next(); } this.workflowWebsocketService.openWebsocket(wid, this.userService.getCurrentUser()?.uid, cuid); @@ -225,6 +236,28 @@ export class ComputingUnitStatusService implements OnDestroy { ); } + /** + * Emits when the connection is reset to switch computing units. Consumers + * subscribe to clear their websocket-derived session state. + */ + public getConnectionResetStream(): Observable<void> { + return this.connectionResetSubject.asObservable(); + } + + /** + * Tear down all websocket connection state when leaving the workspace, so + * re-entering a workflow starts from a clean connection instead of reusing + * the previous one. + */ + public disconnect(): void { + this.workflowWebsocketService.closeWebsocket(); + this.workflowStatusService.clearStatus(); + this.stopPollingSelectedUnit(); + this.currentConnectedCuid = undefined; + this.currentConnectedWid = undefined; + this.selectedUnitSubject.next(null); + } + // Clean up on service destroy ngOnDestroy(): void { this.refreshSubscription?.unsubscribe(); @@ -232,6 +265,7 @@ export class ComputingUnitStatusService implements OnDestroy { this.selectedUnitSubject.complete(); this.allComputingUnitsSubject.complete(); + this.connectionResetSubject.complete(); } /** diff --git a/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts b/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts index 1017d759b5..edfbcbde61 100644 --- a/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts +++ b/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts @@ -21,6 +21,7 @@ import { ComponentFixture, TestBed } from "@angular/core/testing"; import { ResultPanelComponent } from "./result-panel.component"; import { ExecuteWorkflowService } from "../../service/execute-workflow/execute-workflow.service"; +import { WorkflowResultService } from "../../service/workflow-result/workflow-result.service"; import { WorkflowActionService } from "../../service/workflow-graph/model/workflow-action.service"; import { OperatorMetadataService } from "../../service/operator-metadata/operator-metadata.service"; import { StubOperatorMetadataService } from "../../service/operator-metadata/stub-operator-metadata.service"; @@ -38,6 +39,7 @@ describe("ResultPanelComponent", () => { let fixture: ComponentFixture<ResultPanelComponent>; let executeWorkflowService: ExecuteWorkflowService; let workflowActionService: WorkflowActionService; + let workflowResultService: WorkflowResultService; beforeEach(async () => { await TestBed.configureTestingModule({ @@ -60,6 +62,7 @@ describe("ResultPanelComponent", () => { component = fixture.componentInstance; executeWorkflowService = TestBed.inject(ExecuteWorkflowService); workflowActionService = TestBed.inject(WorkflowActionService); + workflowResultService = TestBed.inject(WorkflowResultService); fixture.detectChanges(); }); @@ -82,4 +85,22 @@ describe("ResultPanelComponent", () => { const resultPanelHtmlElement: HTMLElement = resultPanelDiv.nativeElement; expect(resultPanelHtmlElement).toBeTruthy(); }); + + it("wipes the panel and operator selection when results are cleared, e.g. on a computing-unit switch (#3120)", () => { + // Simulate a result frame on screen for a currently-highlighted operator. + // ResultPanelComponent stands in as a throwaway frame component; it's cleared before it renders. + component.currentOperatorId = "op1"; + component.operatorTitle = "Operator 1"; + component.frameComponentConfigs.set("Result", { component: ResultPanelComponent, componentInputs: {} }); + expect(component.frameComponentConfigs.size).toBe(1); + + // A unit switch drops the cached results and emits on the cleared stream. The operator + // stays highlighted, so the normal rerender path won't tear the frame down — only this + // handler does, which is the part that actually fixes the lingering-stale-frame bug. + workflowResultService.clearResults(); + + expect(component.frameComponentConfigs.size).toBe(0); + expect(component.currentOperatorId).toBeUndefined(); + expect(component.operatorTitle).toBe(""); + }); }); diff --git a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts index 3260afb290..b5b0c045da 100644 --- a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts @@ -136,6 +136,7 @@ export class ResultPanelComponent implements OnInit, OnDestroy { this.updateReturnPosition(DEFAULT_HEIGHT, this.height); this.registerAutoRerenderResultPanel(); this.registerAutoOpenResultPanel(); + this.registerResultClearedHandler(); this.handleResultPanelForVersionPreview(); this.panelService.closePanelStream.pipe(untilDestroyed(this)).subscribe(() => this.closePanel()); this.panelService.resetPanelStream.pipe(untilDestroyed(this)).subscribe(() => { @@ -218,6 +219,22 @@ export class ResultPanelComponent implements OnInit, OnDestroy { }); } + /** + * Wipe the panel when results are dropped (e.g. switching computing units): a + * still-highlighted operator isn't re-rendered, so its stale frames would linger. + */ + registerResultClearedHandler() { + this.workflowResultService + .getResultClearedStream() + .pipe(untilDestroyed(this)) + .subscribe(() => { + this.clearResultPanel(); + this.currentOperatorId = undefined; + this.operatorTitle = ""; + this.changeDetectorRef.detectChanges(); + }); + } + registerAutoRerenderResultPanel() { merge( this.executeWorkflowService diff --git a/frontend/src/app/workspace/component/workspace.component.spec.ts b/frontend/src/app/workspace/component/workspace.component.spec.ts index 7659f346f3..f85294e42a 100644 --- a/frontend/src/app/workspace/component/workspace.component.spec.ts +++ b/frontend/src/app/workspace/component/workspace.component.spec.ts @@ -40,8 +40,11 @@ import { WorkflowCompilingService } from "../service/compile-workflow/workflow-c import { OperatorMetadataService } from "../service/operator-metadata/operator-metadata.service"; import { UndoRedoService } from "../service/undo-redo/undo-redo.service"; import { WorkflowConsoleService } from "../service/workflow-console/workflow-console.service"; +import { ExecuteWorkflowService } from "../service/execute-workflow/execute-workflow.service"; +import { WorkflowResultService } from "../service/workflow-result/workflow-result.service"; import { WorkflowActionService } from "../service/workflow-graph/model/workflow-action.service"; import { OperatorReuseCacheStatusService } from "../service/workflow-status/operator-reuse-cache-status.service"; +import { ComputingUnitStatusService } from "../../common/service/computing-unit/computing-unit-status/computing-unit-status.service"; import { EntityType, HubService } from "../../hub/service/hub.service"; import { commonTestProviders } from "../../common/testing/test-utils"; import { WorkspaceComponent } from "./workspace.component"; @@ -62,6 +65,11 @@ describe("WorkspaceComponent", () => { let messageService: any; let routerMock: any; let locationMock: any; + let computingUnitStatusService: any; + let executeWorkflowService: any; + let workflowConsoleService: any; + let workflowResultService: any; + let connectionResetSubject: Subject<void>; let metadataChangedSubject: Subject<void>; let stubGraph: { triggerCenterEvent: ReturnType<typeof vi.fn>; hasElementWithID: ReturnType<typeof vi.fn> }; @@ -136,6 +144,14 @@ describe("WorkspaceComponent", () => { routerMock = { navigate: vi.fn() }; locationMock = { go: vi.fn() }; + connectionResetSubject = new Subject<void>(); + computingUnitStatusService = { + disconnect: vi.fn(), + getConnectionResetStream: () => connectionResetSubject.asObservable(), + }; + executeWorkflowService = { resetExecutionAndWorkers: vi.fn() }; + workflowConsoleService = { clearConsoleMessages: vi.fn() }; + workflowResultService = { clearResults: vi.fn() }; // Drop the standalone component's child imports and allow unknown elements via // CUSTOM_ELEMENTS_SCHEMA. The template still renders, so `<ng-template #codeEditor>` @@ -167,8 +183,11 @@ describe("WorkspaceComponent", () => { // The three services listed in the constructor only to force their // initialization aren't exercised by any test here; provide stubs. { provide: WorkflowCompilingService, useValue: {} }, - { provide: WorkflowConsoleService, useValue: {} }, + { provide: WorkflowConsoleService, useValue: workflowConsoleService }, { provide: OperatorReuseCacheStatusService, useValue: {} }, + { provide: ComputingUnitStatusService, useValue: computingUnitStatusService }, + { provide: ExecuteWorkflowService, useValue: executeWorkflowService }, + { provide: WorkflowResultService, useValue: workflowResultService }, ...commonTestProviders, ], schemas: [NO_ERRORS_SCHEMA], @@ -415,6 +434,26 @@ describe("WorkspaceComponent", () => { // Cleanup of the workflow state still happens regardless. expect(workflowActionService.clearWorkflow).toHaveBeenCalled(); }); + + it("tears down every piece of websocket-derived state when leaving the workspace (issue #3120)", async () => { + await createFixture(); + fixture.detectChanges(); + component.ngOnDestroy(); + expect(computingUnitStatusService.disconnect).toHaveBeenCalled(); + expect(executeWorkflowService.resetExecutionAndWorkers).toHaveBeenCalled(); + expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled(); + expect(workflowResultService.clearResults).toHaveBeenCalled(); + }); + + it("clears the workflow session state when the computing unit is switched in-canvas (issue #3120)", async () => { + await createFixture(); + fixture.detectChanges(); + // Switching to a different unit emits on the connection-reset stream. + connectionResetSubject.next(); + expect(executeWorkflowService.resetExecutionAndWorkers).toHaveBeenCalled(); + expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled(); + expect(workflowResultService.clearResults).toHaveBeenCalled(); + }); }); describe("copilotEnabled", () => { diff --git a/frontend/src/app/workspace/component/workspace.component.ts b/frontend/src/app/workspace/component/workspace.component.ts index da220feaab..e7bfcb7bf4 100644 --- a/frontend/src/app/workspace/component/workspace.component.ts +++ b/frontend/src/app/workspace/component/workspace.component.ts @@ -51,6 +51,9 @@ import { THROTTLE_TIME_MS } from "../../hub/component/workflow/detail/hub-workfl import { WorkflowCompilingService } from "../service/compile-workflow/workflow-compiling.service"; import { USER_WORKSPACE } from "../../app-routing.constant"; import { GuiConfigService } from "../../common/service/gui-config.service"; +import { ComputingUnitStatusService } from "../../common/service/computing-unit/computing-unit-status/computing-unit-status.service"; +import { ExecuteWorkflowService } from "../service/execute-workflow/execute-workflow.service"; +import { WorkflowResultService } from "../service/workflow-result/workflow-result.service"; import { checkIfWorkflowBroken } from "../../common/util/workflow-check"; import { NzSpinComponent } from "ng-zorro-antd/spin"; import { ResultPanelComponent } from "./result-panel/result-panel.component"; @@ -126,7 +129,10 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { private hubService: HubService, private codeEditorService: CodeEditorService, private config: GuiConfigService, - private changeDetectorRef: ChangeDetectorRef + private changeDetectorRef: ChangeDetectorRef, + private computingUnitStatusService: ComputingUnitStatusService, + private executeWorkflowService: ExecuteWorkflowService, + private workflowResultService: WorkflowResultService ) {} ngOnInit() { @@ -144,6 +150,12 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { */ this.pid = parseInt(this.route.snapshot.queryParams.pid) || undefined; this.workflowActionService.setHighlightingEnabled(true); + // Clear session state when the user switches computing units in-canvas, so + // the previous unit's status/console/results don't linger. + this.computingUnitStatusService + .getConnectionResetStream() + .pipe(untilDestroyed(this)) + .subscribe(() => this.resetWorkflowSessionState()); } ngAfterViewInit(): void { @@ -184,6 +196,20 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { this.codeEditorViewRef.clear(); this.workflowActionService.clearWorkflow(); + // Tear down the connection and all websocket-derived session state so a + // re-entered workflow starts clean instead of reusing the previous one. + this.computingUnitStatusService.disconnect(); + this.resetWorkflowSessionState(); + } + + /** + * Clear websocket-derived session state (execution status, console, results). + * Shared by workspace teardown and in-canvas unit switches. + */ + private resetWorkflowSessionState(): void { + this.executeWorkflowService.resetExecutionAndWorkers(); + this.workflowConsoleService.clearConsoleMessages(); + this.workflowResultService.clearResults(); } registerAutoPersistWorkflow(): void { diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts index b8d2779c0c..e1ff418abb 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts @@ -86,6 +86,21 @@ describe("ExecuteWorkflowService", () => { expect(injectedService).toBeTruthy(); })); + it("resetExecutionAndWorkers() clears the execution state and worker assignments", () => { + (service as any).currentState = { state: ExecutionState.Running }; + (service as any).assignedWorkerIds.set("op1", ["w1", "w2"]); + + const emittedStates: ExecutionState[] = []; + service.getExecutionStateStream().subscribe(event => emittedStates.push(event.current.state)); + + service.resetExecutionAndWorkers(); + + expect(service.getExecutionState().state).toBe(ExecutionState.Uninitialized); + expect(service.getWorkerIds("op1")).toEqual([]); + // must broadcast on the stream so subscribers (menu, result panel) drop stale status + expect(emittedStates).toContain(ExecutionState.Uninitialized); + }); + it("should generate a logical plan request based on the workflow graph that is passed to the function", () => { const newLogicalPlan: LogicalPlan = ExecuteWorkflowService.getLogicalPlanRequest(mockWorkflowPlan_scan_result); expect(newLogicalPlan).toEqual(mockLogicalPlan_scan_result); diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts index eb86194e7c..c2ab3eac0d 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts @@ -354,6 +354,16 @@ export class ExecuteWorkflowService { }; } + /** + * Reset execution status and worker assignments. Unlike resetExecutionState(), + * this also clears worker assignments and broadcasts the reset on + * executionStateStream so subscribers drop the previous unit's status. + */ + public resetExecutionAndWorkers(): void { + this.updateExecutionState({ state: ExecutionState.Uninitialized }); + this.assignedWorkerIds.clear(); + } + private updateExecutionState(stateInfo: ExecutionStateInfo): void { if (isEqual(this.currentState, stateInfo)) { return; diff --git a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts index 9be036cdcc..a211eea52b 100644 --- a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts +++ b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts @@ -34,4 +34,17 @@ describe("WorkflowConsoleService", () => { it("should be created", () => { expect(service).toBeTruthy(); }); + + it("clearConsoleMessages() removes all messages and notifies subscribers", () => { + (service as any).consoleMessages.set("op1", []); + expect(service.hasConsoleMessages("op1")).toBe(true); + + let notified = false; + service.getConsoleMessageUpdateStream().subscribe(() => (notified = true)); + + service.clearConsoleMessages(); + + expect(service.hasConsoleMessages("op1")).toBe(false); + expect(notified).toBe(true); + }); }); diff --git a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts index b7d60868ae..5f88a22ba9 100644 --- a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts +++ b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts @@ -73,4 +73,13 @@ export class WorkflowConsoleService { getConsoleMessageUpdateStream(): Observable<void> { return this.consoleMessagesUpdateStream.asObservable(); } + + /** + * Clear all console messages so a re-entered workflow doesn't show the + * previous session's output. + */ + public clearConsoleMessages(): void { + this.consoleMessages.clear(); + this.consoleMessagesUpdateStream.next(); + } } diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts index 8406a55c75..031de9bfa0 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts @@ -37,6 +37,33 @@ describe("WorkflowResultService", () => { it("should be created", () => { expect(service).toBeTruthy(); }); + + it("clearResults() drops cached operator results", () => { + (service as any).operatorResultServices.set("op1", {}); + (service as any).paginatedResultServices.set("op2", {}); + expect(service.hasAnyResult("op1")).toBe(true); + expect(service.hasAnyResult("op2")).toBe(true); + + service.clearResults(); + + expect(service.hasAnyResult("op1")).toBe(false); + expect(service.hasAnyResult("op2")).toBe(false); + }); + + it("clearResults() resets table stats to empty for subscribers", () => { + const pairs: [unknown, unknown][] = []; + service.getResultTableStats().subscribe(p => pairs.push(p)); + (service as any).resultTableStats.next({ op1: {} }); + service.clearResults(); + expect(pairs[pairs.length - 1][1]).toEqual({}); + }); + + it("clearResults() emits on the cleared stream so the UI tears down stale frames", () => { + let clearedCount = 0; + service.getResultClearedStream().subscribe(() => clearedCount++); + service.clearResults(); + expect(clearedCount).toBe(1); + }); }); describe("OperatorPaginationResultService", () => { diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts index 9fd18e0f16..ffd9b43b91 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts @@ -49,6 +49,8 @@ export class WorkflowResultService { private resultUpdateStream = new Subject<Record<string, WebResultUpdate | undefined>>(); private resultTableStats = new ReplaySubject<Record<string, Record<string, Record<string, number>>>>(1); private resultInitiateStream = new Subject<string>(); + // emits when clearResults() drops cached results, so the UI can drop stale frames + private resultClearedStream = new Subject<void>(); constructor(private wsService: WorkflowWebsocketService) { this.wsService.subscribeToEvent("WebResultUpdateEvent").subscribe(event => { @@ -87,6 +89,14 @@ export class WorkflowResultService { return this.resultInitiateStream.asObservable(); } + /** + * Emits when clearResults() drops cached results, so consumers can tear down + * stale frames (clearing the caches alone won't re-render a displayed operator). + */ + public getResultClearedStream(): Observable<void> { + return this.resultClearedStream.asObservable(); + } + public getPaginatedResultService(operatorID: string): OperatorPaginationResultService | undefined { return this.paginatedResultServices.get(operatorID); } @@ -95,6 +105,18 @@ export class WorkflowResultService { return this.operatorResultServices.get(operatorID); } + /** + * Drop cached results and reset table stats so a re-entered workflow doesn't show + * stale results (resultTableStats is a ReplaySubject, so push an empty snapshot). + * Emits resultClearedStream so subscribers tear down already-displayed frames. + */ + public clearResults(): void { + this.operatorResultServices.clear(); + this.paginatedResultServices.clear(); + this.resultTableStats.next({}); + this.resultClearedStream.next(); + } + private handleCleanResultCache(event: WorkflowAvailableResultEvent): void { const removedOrInvalidatedOperators = new Set<string>(); // remove operators that no longer have results diff --git a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts index f2862d18d7..4fb0fe1d10 100644 --- a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts +++ b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts @@ -94,4 +94,12 @@ describe("WorkflowWebsocketService", () => { window.WebSocket = originalWebSocket; } }); + + it("should reset the cached worker count when the websocket is closed", () => { + // numWorkers is populated from ClusterStatusUpdateEvent on the live connection; + // once the socket is closed the count is stale and must reset. + service.numWorkers = 5; + service.closeWebsocket(); + expect(service.numWorkers).toBe(-1); + }); }); diff --git a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts index 017e43ee6a..4e52d3fa09 100644 --- a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts +++ b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts @@ -93,6 +93,8 @@ export class WorkflowWebsocketService { this.wsWithReconnectSubscription?.unsubscribe(); this.statusUpdateSubscription?.unsubscribe(); this.websocket?.complete(); + // the worker count comes from the live connection; reset it once the socket is gone + this.numWorkers = -1; this.updateConnectionStatus(false); }
