This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5565-564ccdbfd51089f31dcfdcbdba9f5a872a2f8256
in repository https://gitbox.apache.org/repos/asf/texera.git

commit ad10b7f2b3c8110ce36379e2f538a8f6554fb494
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Tue Jun 9 04:15:13 2026 -0700

    fix(frontend): clean up websocket state when returning to the dashboard 
(#5565)
    
    ### What changes were proposed in this PR?
    
    Websocket-derived front-end state (the connection itself, plus the
    execution status, console output, and results built from its events)
    lives in singletons outside the workspace. It was never torn down in two
    cases, so stale state carried over:
    
    1. **Returning to the dashboard** and re-entering a workflow reused the
    previous socket. The connection-tracking fields (`currentConnectedWid` /
    `currentConnectedCuid`) also survived, so the reconnect guard saw them
    unchanged, skipped reconnecting, and reused the stale socket. (#3120 —
    the case #3093 did not cover.)
    2. **Switching computing units** inside the workspace left the previous
    unit's console, results, and execution status on screen.
    
    This PR clears that state at both points.
    
    **Workspace exit**: `WorkspaceComponent.ngOnDestroy()` now tears
    everything down:
    
    | Call *(new)* | Resets |
    | --- | --- |
    | `ComputingUnitStatusService.disconnect()` | closes the socket, clears
    operator status, stops the unit poll, resets the connection-tracking
    fields and the selected unit |
    | `ExecuteWorkflowService.resetExecutionAndWorkers()` | execution status
    and worker assignments |
    | `WorkflowConsoleService.clearConsoleMessages()` | console output |
    | `WorkflowResultService.clearResults()` | result caches and table stats
    |
    
    **Unit switch**: `ComputingUnitStatusService` emits a reset signal when
    it reconnects to a different unit, and `WorkspaceComponent` clears the
    same execution / console / result state in response. As a result,
    switching units now discards the previous unit's results and console
    instead of leaving them on screen.
    
    The remaining websocket-event consumers need no teardown:
    `OperatorReuseCacheStatusService` is stateless, and
    `udf-debug.service`'s state lives in the `TexeraGraph`, already reset by
    `clearWorkflow()`.
    
    ### Any related issues, documentation, discussions?
    
    Closes #3120. Related: #3093 (earlier partial fix for the in-canvas
    socket re-open).
    
    ### How was this PR tested?
    
    Test with this workflow
    [Untitled workflow
    
(14).json](https://github.com/user-attachments/files/28696700/Untitled.workflow.14.json)
    
    
    
https://github.com/user-attachments/assets/060fe1ac-39cf-45e5-b423-5aa27fe17aed
    
    
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.8)
    
    ---------
    
    Co-authored-by: Xinyuan Lin <[email protected]>
---
 .../computing-unit-status.service.spec.ts          | 146 +++++++++++++++++++++
 .../computing-unit-status.service.ts               |  34 +++++
 .../result-panel/result-panel.component.spec.ts    |  21 +++
 .../result-panel/result-panel.component.ts         |  17 +++
 .../component/workspace.component.spec.ts          |  41 +++++-
 .../app/workspace/component/workspace.component.ts |  28 +++-
 .../execute-workflow.service.spec.ts               |  15 +++
 .../execute-workflow/execute-workflow.service.ts   |  10 ++
 .../workflow-console.service.spec.ts               |  13 ++
 .../workflow-console/workflow-console.service.ts   |   9 ++
 .../workflow-result.service.spec.ts                |  27 ++++
 .../workflow-result/workflow-result.service.ts     |  22 ++++
 .../workflow-websocket.service.spec.ts             |   8 ++
 .../workflow-websocket.service.ts                  |   2 +
 14 files changed, 391 insertions(+), 2 deletions(-)

diff --git 
a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts
 
b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts
new file mode 100644
index 0000000000..d9c3319cf3
--- /dev/null
+++ 
b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { TestBed } from "@angular/core/testing";
+import { HttpClientTestingModule } from "@angular/common/http/testing";
+import { of } from "rxjs";
+import { ComputingUnitStatusService } from "./computing-unit-status.service";
+import { WorkflowComputingUnitManagingService } from 
"../workflow-computing-unit/workflow-computing-unit-managing.service";
+import { WorkflowWebsocketService } from 
"../../../../workspace/service/workflow-websocket/workflow-websocket.service";
+import { WorkflowStatusService } from 
"../../../../workspace/service/workflow-status/workflow-status.service";
+import { UserService } from "../../user/user.service";
+import { StubUserService } from "../../user/stub-user.service";
+import { AuthService } from "../../user/auth.service";
+import { StubAuthService } from "../../user/stub-auth.service";
+import { DashboardWorkflowComputingUnit } from 
"../../../type/workflow-computing-unit";
+import { commonTestProviders } from "../../../testing/test-utils";
+
+describe("ComputingUnitStatusService", () => {
+  let service: ComputingUnitStatusService;
+  let websocketService: WorkflowWebsocketService;
+
+  const mockUnit = (cuid: number) => ({ computingUnit: { cuid } }) as unknown 
as DashboardWorkflowComputingUnit;
+
+  beforeEach(() => {
+    const managingStub = {
+      listComputingUnits: () => of([]),
+      getComputingUnit: (cuid: number) => of(mockUnit(cuid)),
+      terminateComputingUnit: () => of(undefined),
+    };
+
+    TestBed.configureTestingModule({
+      imports: [HttpClientTestingModule],
+      providers: [
+        ComputingUnitStatusService,
+        WorkflowWebsocketService,
+        WorkflowStatusService,
+        { provide: WorkflowComputingUnitManagingService, useValue: 
managingStub },
+        { provide: UserService, useClass: StubUserService },
+        { provide: AuthService, useClass: StubAuthService },
+        ...commonTestProviders,
+      ],
+    });
+
+    service = TestBed.inject(ComputingUnitStatusService);
+    websocketService = TestBed.inject(WorkflowWebsocketService);
+  });
+
+  afterEach(() => {
+    // tear down the interval poll started by selectComputingUnit() so it 
can't outlive the test
+    service.ngOnDestroy();
+  });
+
+  it("should be created", () => {
+    expect(service).toBeTruthy();
+  });
+
+  it("reconnects when re-selecting the same workflow after disconnect 
(regression #3120)", () => {
+    const openSpy = vi.spyOn(websocketService, 
"openWebsocket").mockImplementation(() => {});
+    const closeSpy = vi.spyOn(websocketService, "closeWebsocket");
+    (service as any).allComputingUnitsSubject.next([mockUnit(7)]);
+
+    // Enter workflow 5 on computing unit 7 → opens the websocket once.
+    service.selectComputingUnit(5, 7);
+    expect(openSpy).toHaveBeenCalledTimes(1);
+
+    // User returns to the dashboard.
+    service.disconnect();
+    expect(closeSpy).toHaveBeenCalled();
+
+    // Re-enter the SAME workflow (the `wid -> null -> wid` pattern): without 
the
+    // cleanup, the retained currentConnectedWid/Cuid would suppress the 
reconnect.
+    service.selectComputingUnit(5, 7);
+    expect(openSpy).toHaveBeenCalledTimes(2);
+  });
+
+  it("disconnect() clears the selected computing unit", () => {
+    vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {});
+    (service as any).allComputingUnitsSubject.next([mockUnit(7)]);
+    service.selectComputingUnit(5, 7);
+
+    let latest: DashboardWorkflowComputingUnit | null = mockUnit(7);
+    service.getSelectedComputingUnit().subscribe(unit => (latest = unit));
+    expect(latest).not.toBeNull();
+
+    service.disconnect();
+    expect(latest).toBeNull();
+  });
+
+  it("emits a connection-reset signal when switching to a different computing 
unit (issue #3120)", () => {
+    let connected = false;
+    vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {
+      connected = true;
+    });
+    vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => {
+      connected = false;
+    });
+    vi.spyOn(websocketService, "isConnected", "get").mockImplementation(() => 
connected);
+    (service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]);
+
+    let resetCount = 0;
+    service.getConnectionResetStream().subscribe(() => resetCount++);
+
+    // First connection on unit 7: nothing to tear down yet → no signal.
+    service.selectComputingUnit(5, 7);
+    expect(resetCount).toBe(0);
+
+    // Switch to a different unit while connected → tear-down signal fires 
once.
+    service.selectComputingUnit(5, 8);
+    expect(resetCount).toBe(1);
+  });
+
+  it("emits a connection-reset signal when switching units even if the socket 
already dropped (issue #3120)", () => {
+    vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {});
+    vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => {});
+    // socket reports disconnected throughout, e.g. the previous unit was 
terminated
+    vi.spyOn(websocketService, "isConnected", "get").mockReturnValue(false);
+    (service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]);
+
+    let resetCount = 0;
+    service.getConnectionResetStream().subscribe(() => resetCount++);
+
+    // First connection on unit 7: nothing to tear down yet → no signal.
+    service.selectComputingUnit(5, 7);
+    expect(resetCount).toBe(0);
+
+    // Switch units while disconnected: unit 7's stale state must still be 
cleared.
+    service.selectComputingUnit(5, 8);
+    expect(resetCount).toBe(1);
+  });
+});
diff --git 
a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts
 
b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts
index 831263183b..e5b6c44388 100644
--- 
a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts
+++ 
b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts
@@ -47,6 +47,10 @@ export class ComputingUnitStatusService implements OnDestroy 
{
 
   private readonly refreshComputingUnitListSignal = new Subject<void>();
 
+  // Emits when the active connection is torn down to switch computing units, 
so
+  // session consumers can clear their websocket-derived state.
+  private readonly connectionResetSubject = new Subject<void>();
+
   // Refresh interval in milliseconds
   private readonly REFRESH_INTERVAL_MS = 2000;
   private refreshSubscription: Subscription | null = null;
@@ -158,9 +162,16 @@ export class ComputingUnitStatusService implements 
OnDestroy {
       // open websocket if needed
       const shouldReconnect = this.currentConnectedCuid !== cuid || 
this.currentConnectedWid !== wid;
       if (isDefined(wid) && shouldReconnect) {
+        // Tear down stale state on switch even if the socket already dropped
+        // (e.g. the prior unit was terminated), not just while still 
connected.
+        const hadPreviousConnection = isDefined(this.currentConnectedWid) || 
isDefined(this.currentConnectedCuid);
         if (this.workflowWebsocketService.isConnected) {
           this.workflowWebsocketService.closeWebsocket();
+        }
+        if (hadPreviousConnection) {
           this.workflowStatusService.clearStatus();
+          // switching units: signal consumers to clear their stale state
+          this.connectionResetSubject.next();
         }
 
         this.workflowWebsocketService.openWebsocket(wid, 
this.userService.getCurrentUser()?.uid, cuid);
@@ -225,6 +236,28 @@ export class ComputingUnitStatusService implements 
OnDestroy {
     );
   }
 
+  /**
+   * Emits when the connection is reset to switch computing units. Consumers
+   * subscribe to clear their websocket-derived session state.
+   */
+  public getConnectionResetStream(): Observable<void> {
+    return this.connectionResetSubject.asObservable();
+  }
+
+  /**
+   * Tear down all websocket connection state when leaving the workspace, so
+   * re-entering a workflow starts from a clean connection instead of reusing
+   * the previous one.
+   */
+  public disconnect(): void {
+    this.workflowWebsocketService.closeWebsocket();
+    this.workflowStatusService.clearStatus();
+    this.stopPollingSelectedUnit();
+    this.currentConnectedCuid = undefined;
+    this.currentConnectedWid = undefined;
+    this.selectedUnitSubject.next(null);
+  }
+
   // Clean up on service destroy
   ngOnDestroy(): void {
     this.refreshSubscription?.unsubscribe();
@@ -232,6 +265,7 @@ export class ComputingUnitStatusService implements 
OnDestroy {
 
     this.selectedUnitSubject.complete();
     this.allComputingUnitsSubject.complete();
+    this.connectionResetSubject.complete();
   }
 
   /**
diff --git 
a/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts
 
b/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts
index 1017d759b5..edfbcbde61 100644
--- 
a/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts
+++ 
b/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts
@@ -21,6 +21,7 @@ import { ComponentFixture, TestBed } from 
"@angular/core/testing";
 
 import { ResultPanelComponent } from "./result-panel.component";
 import { ExecuteWorkflowService } from 
"../../service/execute-workflow/execute-workflow.service";
+import { WorkflowResultService } from 
"../../service/workflow-result/workflow-result.service";
 import { WorkflowActionService } from 
"../../service/workflow-graph/model/workflow-action.service";
 import { OperatorMetadataService } from 
"../../service/operator-metadata/operator-metadata.service";
 import { StubOperatorMetadataService } from 
"../../service/operator-metadata/stub-operator-metadata.service";
@@ -38,6 +39,7 @@ describe("ResultPanelComponent", () => {
   let fixture: ComponentFixture<ResultPanelComponent>;
   let executeWorkflowService: ExecuteWorkflowService;
   let workflowActionService: WorkflowActionService;
+  let workflowResultService: WorkflowResultService;
 
   beforeEach(async () => {
     await TestBed.configureTestingModule({
@@ -60,6 +62,7 @@ describe("ResultPanelComponent", () => {
     component = fixture.componentInstance;
     executeWorkflowService = TestBed.inject(ExecuteWorkflowService);
     workflowActionService = TestBed.inject(WorkflowActionService);
+    workflowResultService = TestBed.inject(WorkflowResultService);
     fixture.detectChanges();
   });
 
@@ -82,4 +85,22 @@ describe("ResultPanelComponent", () => {
     const resultPanelHtmlElement: HTMLElement = resultPanelDiv.nativeElement;
     expect(resultPanelHtmlElement).toBeTruthy();
   });
+
+  it("wipes the panel and operator selection when results are cleared, e.g. on 
a computing-unit switch (#3120)", () => {
+    // Simulate a result frame on screen for a currently-highlighted operator.
+    // ResultPanelComponent stands in as a throwaway frame component; it's 
cleared before it renders.
+    component.currentOperatorId = "op1";
+    component.operatorTitle = "Operator 1";
+    component.frameComponentConfigs.set("Result", { component: 
ResultPanelComponent, componentInputs: {} });
+    expect(component.frameComponentConfigs.size).toBe(1);
+
+    // A unit switch drops the cached results and emits on the cleared stream. 
The operator
+    // stays highlighted, so the normal rerender path won't tear the frame 
down — only this
+    // handler does, which is the part that actually fixes the 
lingering-stale-frame bug.
+    workflowResultService.clearResults();
+
+    expect(component.frameComponentConfigs.size).toBe(0);
+    expect(component.currentOperatorId).toBeUndefined();
+    expect(component.operatorTitle).toBe("");
+  });
 });
diff --git 
a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts 
b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts
index 3260afb290..b5b0c045da 100644
--- 
a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts
+++ 
b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts
@@ -136,6 +136,7 @@ export class ResultPanelComponent implements OnInit, 
OnDestroy {
     this.updateReturnPosition(DEFAULT_HEIGHT, this.height);
     this.registerAutoRerenderResultPanel();
     this.registerAutoOpenResultPanel();
+    this.registerResultClearedHandler();
     this.handleResultPanelForVersionPreview();
     this.panelService.closePanelStream.pipe(untilDestroyed(this)).subscribe(() 
=> this.closePanel());
     this.panelService.resetPanelStream.pipe(untilDestroyed(this)).subscribe(() 
=> {
@@ -218,6 +219,22 @@ export class ResultPanelComponent implements OnInit, 
OnDestroy {
       });
   }
 
+  /**
+   * Wipe the panel when results are dropped (e.g. switching computing units): 
a
+   * still-highlighted operator isn't re-rendered, so its stale frames would 
linger.
+   */
+  registerResultClearedHandler() {
+    this.workflowResultService
+      .getResultClearedStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(() => {
+        this.clearResultPanel();
+        this.currentOperatorId = undefined;
+        this.operatorTitle = "";
+        this.changeDetectorRef.detectChanges();
+      });
+  }
+
   registerAutoRerenderResultPanel() {
     merge(
       this.executeWorkflowService
diff --git a/frontend/src/app/workspace/component/workspace.component.spec.ts 
b/frontend/src/app/workspace/component/workspace.component.spec.ts
index 7659f346f3..f85294e42a 100644
--- a/frontend/src/app/workspace/component/workspace.component.spec.ts
+++ b/frontend/src/app/workspace/component/workspace.component.spec.ts
@@ -40,8 +40,11 @@ import { WorkflowCompilingService } from 
"../service/compile-workflow/workflow-c
 import { OperatorMetadataService } from 
"../service/operator-metadata/operator-metadata.service";
 import { UndoRedoService } from "../service/undo-redo/undo-redo.service";
 import { WorkflowConsoleService } from 
"../service/workflow-console/workflow-console.service";
+import { ExecuteWorkflowService } from 
"../service/execute-workflow/execute-workflow.service";
+import { WorkflowResultService } from 
"../service/workflow-result/workflow-result.service";
 import { WorkflowActionService } from 
"../service/workflow-graph/model/workflow-action.service";
 import { OperatorReuseCacheStatusService } from 
"../service/workflow-status/operator-reuse-cache-status.service";
+import { ComputingUnitStatusService } from 
"../../common/service/computing-unit/computing-unit-status/computing-unit-status.service";
 import { EntityType, HubService } from "../../hub/service/hub.service";
 import { commonTestProviders } from "../../common/testing/test-utils";
 import { WorkspaceComponent } from "./workspace.component";
@@ -62,6 +65,11 @@ describe("WorkspaceComponent", () => {
   let messageService: any;
   let routerMock: any;
   let locationMock: any;
+  let computingUnitStatusService: any;
+  let executeWorkflowService: any;
+  let workflowConsoleService: any;
+  let workflowResultService: any;
+  let connectionResetSubject: Subject<void>;
   let metadataChangedSubject: Subject<void>;
   let stubGraph: { triggerCenterEvent: ReturnType<typeof vi.fn>; 
hasElementWithID: ReturnType<typeof vi.fn> };
 
@@ -136,6 +144,14 @@ describe("WorkspaceComponent", () => {
 
     routerMock = { navigate: vi.fn() };
     locationMock = { go: vi.fn() };
+    connectionResetSubject = new Subject<void>();
+    computingUnitStatusService = {
+      disconnect: vi.fn(),
+      getConnectionResetStream: () => connectionResetSubject.asObservable(),
+    };
+    executeWorkflowService = { resetExecutionAndWorkers: vi.fn() };
+    workflowConsoleService = { clearConsoleMessages: vi.fn() };
+    workflowResultService = { clearResults: vi.fn() };
 
     // Drop the standalone component's child imports and allow unknown 
elements via
     // CUSTOM_ELEMENTS_SCHEMA. The template still renders, so `<ng-template 
#codeEditor>`
@@ -167,8 +183,11 @@ describe("WorkspaceComponent", () => {
         // The three services listed in the constructor only to force their
         // initialization aren't exercised by any test here; provide stubs.
         { provide: WorkflowCompilingService, useValue: {} },
-        { provide: WorkflowConsoleService, useValue: {} },
+        { provide: WorkflowConsoleService, useValue: workflowConsoleService },
         { provide: OperatorReuseCacheStatusService, useValue: {} },
+        { provide: ComputingUnitStatusService, useValue: 
computingUnitStatusService },
+        { provide: ExecuteWorkflowService, useValue: executeWorkflowService },
+        { provide: WorkflowResultService, useValue: workflowResultService },
         ...commonTestProviders,
       ],
       schemas: [NO_ERRORS_SCHEMA],
@@ -415,6 +434,26 @@ describe("WorkspaceComponent", () => {
       // Cleanup of the workflow state still happens regardless.
       expect(workflowActionService.clearWorkflow).toHaveBeenCalled();
     });
+
+    it("tears down every piece of websocket-derived state when leaving the 
workspace (issue #3120)", async () => {
+      await createFixture();
+      fixture.detectChanges();
+      component.ngOnDestroy();
+      expect(computingUnitStatusService.disconnect).toHaveBeenCalled();
+      
expect(executeWorkflowService.resetExecutionAndWorkers).toHaveBeenCalled();
+      expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled();
+      expect(workflowResultService.clearResults).toHaveBeenCalled();
+    });
+
+    it("clears the workflow session state when the computing unit is switched 
in-canvas (issue #3120)", async () => {
+      await createFixture();
+      fixture.detectChanges();
+      // Switching to a different unit emits on the connection-reset stream.
+      connectionResetSubject.next();
+      
expect(executeWorkflowService.resetExecutionAndWorkers).toHaveBeenCalled();
+      expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled();
+      expect(workflowResultService.clearResults).toHaveBeenCalled();
+    });
   });
 
   describe("copilotEnabled", () => {
diff --git a/frontend/src/app/workspace/component/workspace.component.ts 
b/frontend/src/app/workspace/component/workspace.component.ts
index da220feaab..e7bfcb7bf4 100644
--- a/frontend/src/app/workspace/component/workspace.component.ts
+++ b/frontend/src/app/workspace/component/workspace.component.ts
@@ -51,6 +51,9 @@ import { THROTTLE_TIME_MS } from 
"../../hub/component/workflow/detail/hub-workfl
 import { WorkflowCompilingService } from 
"../service/compile-workflow/workflow-compiling.service";
 import { USER_WORKSPACE } from "../../app-routing.constant";
 import { GuiConfigService } from "../../common/service/gui-config.service";
+import { ComputingUnitStatusService } from 
"../../common/service/computing-unit/computing-unit-status/computing-unit-status.service";
+import { ExecuteWorkflowService } from 
"../service/execute-workflow/execute-workflow.service";
+import { WorkflowResultService } from 
"../service/workflow-result/workflow-result.service";
 import { checkIfWorkflowBroken } from "../../common/util/workflow-check";
 import { NzSpinComponent } from "ng-zorro-antd/spin";
 import { ResultPanelComponent } from "./result-panel/result-panel.component";
@@ -126,7 +129,10 @@ export class WorkspaceComponent implements AfterViewInit, 
OnInit, OnDestroy {
     private hubService: HubService,
     private codeEditorService: CodeEditorService,
     private config: GuiConfigService,
-    private changeDetectorRef: ChangeDetectorRef
+    private changeDetectorRef: ChangeDetectorRef,
+    private computingUnitStatusService: ComputingUnitStatusService,
+    private executeWorkflowService: ExecuteWorkflowService,
+    private workflowResultService: WorkflowResultService
   ) {}
 
   ngOnInit() {
@@ -144,6 +150,12 @@ export class WorkspaceComponent implements AfterViewInit, 
OnInit, OnDestroy {
      */
     this.pid = parseInt(this.route.snapshot.queryParams.pid) || undefined;
     this.workflowActionService.setHighlightingEnabled(true);
+    // Clear session state when the user switches computing units in-canvas, so
+    // the previous unit's status/console/results don't linger.
+    this.computingUnitStatusService
+      .getConnectionResetStream()
+      .pipe(untilDestroyed(this))
+      .subscribe(() => this.resetWorkflowSessionState());
   }
 
   ngAfterViewInit(): void {
@@ -184,6 +196,20 @@ export class WorkspaceComponent implements AfterViewInit, 
OnInit, OnDestroy {
 
     this.codeEditorViewRef.clear();
     this.workflowActionService.clearWorkflow();
+    // Tear down the connection and all websocket-derived session state so a
+    // re-entered workflow starts clean instead of reusing the previous one.
+    this.computingUnitStatusService.disconnect();
+    this.resetWorkflowSessionState();
+  }
+
+  /**
+   * Clear websocket-derived session state (execution status, console, 
results).
+   * Shared by workspace teardown and in-canvas unit switches.
+   */
+  private resetWorkflowSessionState(): void {
+    this.executeWorkflowService.resetExecutionAndWorkers();
+    this.workflowConsoleService.clearConsoleMessages();
+    this.workflowResultService.clearResults();
   }
 
   registerAutoPersistWorkflow(): void {
diff --git 
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts
 
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts
index b8d2779c0c..e1ff418abb 100644
--- 
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts
+++ 
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts
@@ -86,6 +86,21 @@ describe("ExecuteWorkflowService", () => {
     expect(injectedService).toBeTruthy();
   }));
 
+  it("resetExecutionAndWorkers() clears the execution state and worker 
assignments", () => {
+    (service as any).currentState = { state: ExecutionState.Running };
+    (service as any).assignedWorkerIds.set("op1", ["w1", "w2"]);
+
+    const emittedStates: ExecutionState[] = [];
+    service.getExecutionStateStream().subscribe(event => 
emittedStates.push(event.current.state));
+
+    service.resetExecutionAndWorkers();
+
+    
expect(service.getExecutionState().state).toBe(ExecutionState.Uninitialized);
+    expect(service.getWorkerIds("op1")).toEqual([]);
+    // must broadcast on the stream so subscribers (menu, result panel) drop 
stale status
+    expect(emittedStates).toContain(ExecutionState.Uninitialized);
+  });
+
   it("should generate a logical plan request based on the workflow graph that 
is passed to the function", () => {
     const newLogicalPlan: LogicalPlan = 
ExecuteWorkflowService.getLogicalPlanRequest(mockWorkflowPlan_scan_result);
     expect(newLogicalPlan).toEqual(mockLogicalPlan_scan_result);
diff --git 
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
 
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
index eb86194e7c..c2ab3eac0d 100644
--- 
a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
+++ 
b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts
@@ -354,6 +354,16 @@ export class ExecuteWorkflowService {
     };
   }
 
+  /**
+   * Reset execution status and worker assignments. Unlike 
resetExecutionState(),
+   * this also clears worker assignments and broadcasts the reset on
+   * executionStateStream so subscribers drop the previous unit's status.
+   */
+  public resetExecutionAndWorkers(): void {
+    this.updateExecutionState({ state: ExecutionState.Uninitialized });
+    this.assignedWorkerIds.clear();
+  }
+
   private updateExecutionState(stateInfo: ExecutionStateInfo): void {
     if (isEqual(this.currentState, stateInfo)) {
       return;
diff --git 
a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts
 
b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts
index 9be036cdcc..a211eea52b 100644
--- 
a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts
+++ 
b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts
@@ -34,4 +34,17 @@ describe("WorkflowConsoleService", () => {
   it("should be created", () => {
     expect(service).toBeTruthy();
   });
+
+  it("clearConsoleMessages() removes all messages and notifies subscribers", 
() => {
+    (service as any).consoleMessages.set("op1", []);
+    expect(service.hasConsoleMessages("op1")).toBe(true);
+
+    let notified = false;
+    service.getConsoleMessageUpdateStream().subscribe(() => (notified = true));
+
+    service.clearConsoleMessages();
+
+    expect(service.hasConsoleMessages("op1")).toBe(false);
+    expect(notified).toBe(true);
+  });
 });
diff --git 
a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts
 
b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts
index b7d60868ae..5f88a22ba9 100644
--- 
a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts
+++ 
b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts
@@ -73,4 +73,13 @@ export class WorkflowConsoleService {
   getConsoleMessageUpdateStream(): Observable<void> {
     return this.consoleMessagesUpdateStream.asObservable();
   }
+
+  /**
+   * Clear all console messages so a re-entered workflow doesn't show the
+   * previous session's output.
+   */
+  public clearConsoleMessages(): void {
+    this.consoleMessages.clear();
+    this.consoleMessagesUpdateStream.next();
+  }
 }
diff --git 
a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts
 
b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts
index 8406a55c75..031de9bfa0 100644
--- 
a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts
+++ 
b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts
@@ -37,6 +37,33 @@ describe("WorkflowResultService", () => {
   it("should be created", () => {
     expect(service).toBeTruthy();
   });
+
+  it("clearResults() drops cached operator results", () => {
+    (service as any).operatorResultServices.set("op1", {});
+    (service as any).paginatedResultServices.set("op2", {});
+    expect(service.hasAnyResult("op1")).toBe(true);
+    expect(service.hasAnyResult("op2")).toBe(true);
+
+    service.clearResults();
+
+    expect(service.hasAnyResult("op1")).toBe(false);
+    expect(service.hasAnyResult("op2")).toBe(false);
+  });
+
+  it("clearResults() resets table stats to empty for subscribers", () => {
+    const pairs: [unknown, unknown][] = [];
+    service.getResultTableStats().subscribe(p => pairs.push(p));
+    (service as any).resultTableStats.next({ op1: {} });
+    service.clearResults();
+    expect(pairs[pairs.length - 1][1]).toEqual({});
+  });
+
+  it("clearResults() emits on the cleared stream so the UI tears down stale 
frames", () => {
+    let clearedCount = 0;
+    service.getResultClearedStream().subscribe(() => clearedCount++);
+    service.clearResults();
+    expect(clearedCount).toBe(1);
+  });
 });
 
 describe("OperatorPaginationResultService", () => {
diff --git 
a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts 
b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts
index 9fd18e0f16..ffd9b43b91 100644
--- 
a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts
+++ 
b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts
@@ -49,6 +49,8 @@ export class WorkflowResultService {
   private resultUpdateStream = new Subject<Record<string, WebResultUpdate | 
undefined>>();
   private resultTableStats = new ReplaySubject<Record<string, Record<string, 
Record<string, number>>>>(1);
   private resultInitiateStream = new Subject<string>();
+  // emits when clearResults() drops cached results, so the UI can drop stale 
frames
+  private resultClearedStream = new Subject<void>();
 
   constructor(private wsService: WorkflowWebsocketService) {
     this.wsService.subscribeToEvent("WebResultUpdateEvent").subscribe(event => 
{
@@ -87,6 +89,14 @@ export class WorkflowResultService {
     return this.resultInitiateStream.asObservable();
   }
 
+  /**
+   * Emits when clearResults() drops cached results, so consumers can tear down
+   * stale frames (clearing the caches alone won't re-render a displayed 
operator).
+   */
+  public getResultClearedStream(): Observable<void> {
+    return this.resultClearedStream.asObservable();
+  }
+
   public getPaginatedResultService(operatorID: string): 
OperatorPaginationResultService | undefined {
     return this.paginatedResultServices.get(operatorID);
   }
@@ -95,6 +105,18 @@ export class WorkflowResultService {
     return this.operatorResultServices.get(operatorID);
   }
 
+  /**
+   * Drop cached results and reset table stats so a re-entered workflow 
doesn't show
+   * stale results (resultTableStats is a ReplaySubject, so push an empty 
snapshot).
+   * Emits resultClearedStream so subscribers tear down already-displayed 
frames.
+   */
+  public clearResults(): void {
+    this.operatorResultServices.clear();
+    this.paginatedResultServices.clear();
+    this.resultTableStats.next({});
+    this.resultClearedStream.next();
+  }
+
   private handleCleanResultCache(event: WorkflowAvailableResultEvent): void {
     const removedOrInvalidatedOperators = new Set<string>();
     // remove operators that no longer have results
diff --git 
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
 
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
index f2862d18d7..4fb0fe1d10 100644
--- 
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
+++ 
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts
@@ -94,4 +94,12 @@ describe("WorkflowWebsocketService", () => {
       window.WebSocket = originalWebSocket;
     }
   });
+
+  it("should reset the cached worker count when the websocket is closed", () 
=> {
+    // numWorkers is populated from ClusterStatusUpdateEvent on the live 
connection;
+    // once the socket is closed the count is stale and must reset.
+    service.numWorkers = 5;
+    service.closeWebsocket();
+    expect(service.numWorkers).toBe(-1);
+  });
 });
diff --git 
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
 
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
index 017e43ee6a..4e52d3fa09 100644
--- 
a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
+++ 
b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts
@@ -93,6 +93,8 @@ export class WorkflowWebsocketService {
     this.wsWithReconnectSubscription?.unsubscribe();
     this.statusUpdateSubscription?.unsubscribe();
     this.websocket?.complete();
+    // the worker count comes from the live connection; reset it once the 
socket is gone
+    this.numWorkers = -1;
     this.updateConnectionStatus(false);
   }
 


Reply via email to