This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5712-94170aef8559473c67eea7351ce5948ce858126c in repository https://gitbox.apache.org/repos/asf/texera.git
commit dfa0434e70c256feb37a8d38c1d0bc134e789fdd Author: Xinyuan Lin <[email protected]> AuthorDate: Sun Jun 14 17:58:45 2026 -0700 refactor(test): extract shared run-and-read e2e harness into TestUtils (#5712) ### What changes were proposed in this PR? Extracts the repeated "run a workflow and read its materialized results" boilerplate from the amber e2e specs into two reusable helpers on `TestUtils`: - `readMaterializedResults(executionId, operatorIds, extract)` — resolve + open each operator's external RESULT document and apply `extract` to the opened `VirtualDocument[Tuple]` (operators with no materialized output are skipped). - `runWorkflowAndReadResults(system, workflow, operatorIds, extract, completionTimeout)` — run a workflow to `COMPLETED` (a `FatalError` aborts and surfaces as the awaited exception), then read results via `readMaterializedResults`. `DataProcessingSpec.executeWorkflow` now calls the shared harness instead of its own inline copy. The helpers are loop/state-agnostic — they only use existing core APIs (`DocumentFactory`, `VirtualDocument[Tuple]`, `AmberClient`, `ExecutionStateUpdate`, `FatalError`), so other e2e specs can adopt them too. ### Any related issues, documentation, discussions? Resolves #5711 (sub-issue of #4442 "Introduce for loop"). Split out of #5700 to keep that PR reviewable, per @Xiao-zhen-Liu's [review](https://github.com/apache/texera/pull/4206#pullrequestreview-4482667715). ### How was this PR tested? Behavior-preserving refactor of existing e2e test infrastructure. `WorkflowExecutionService/Test/compile` and `WorkflowExecutionService/scalafmtCheckAll` pass locally. The `@IntegrationTest` specs that exercise the harness (e.g. `DataProcessingSpec`) run in CI — they spawn Python workers and can't run on Windows. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF. --- .../amber/engine/e2e/DataProcessingSpec.scala | 58 +--------- .../apache/texera/amber/engine/e2e/TestUtils.scala | 123 ++++++++++++++++----- 2 files changed, 101 insertions(+), 80 deletions(-) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala index d070fefb27..2606d9d656 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala @@ -22,10 +22,7 @@ package org.apache.texera.amber.engine.e2e import org.apache.pekko.actor.{ActorSystem, Props} import org.apache.pekko.testkit.{ImplicitSender, TestKit} import org.apache.pekko.util.Timeout -import com.twitter.util.{Await, Duration, Promise} import org.apache.texera.amber.clustering.SingleNodeListener -import org.apache.texera.amber.core.storage.DocumentFactory -import org.apache.texera.amber.core.storage.model.VirtualDocument import org.apache.texera.amber.core.tuple.{AttributeType, Tuple} import org.apache.texera.amber.core.virtualidentity.OperatorIdentity import org.apache.texera.amber.core.workflow.{ @@ -35,19 +32,16 @@ import org.apache.texera.amber.core.workflow.{ WorkflowSettings } import org.apache.texera.amber.engine.architecture.controller._ -import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest -import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED import org.apache.texera.amber.engine.common.AmberRuntime -import org.apache.texera.amber.engine.common.client.AmberClient import org.apache.texera.amber.engine.e2e.TestUtils.{ buildWorkflow, cleanupWorkflowExecutionData, initiateTexeraDBForTestCases, + runWorkflowAndReadTerminalResults, setUpWorkflowExecutionData } import org.apache.texera.amber.operator.TestOperators import org.apache.texera.amber.operator.aggregate.AggregationFunction -import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId import org.apache.texera.workflow.LogicalLink import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries} @@ -101,54 +95,8 @@ class DataProcessingSpec TestKit.shutdownActorSystem(system) } - def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] = { - var results: Map[OperatorIdentity, List[Tuple]] = null - val client = new AmberClient( - system, - workflow.context, - workflow.physicalPlan, - ControllerConfig.default, - error => {} - ) - val completion = Promise[Unit]() - client.registerCallback[FatalError](evt => { - completion.setException(evt.e) - client.shutdown() - }) - - client - .registerCallback[ExecutionStateUpdate](evt => { - if (evt.state == COMPLETED) { - results = workflow.logicalPlan.getTerminalOperatorIds - .filter(terminalOpId => { - val uri = getResultUriByLogicalPortId( - workflowContext.executionId, - terminalOpId, - PortIdentity() - ) - uri.nonEmpty - }) - .map(terminalOpId => { - val uri = getResultUriByLogicalPortId( - workflowContext.executionId, - terminalOpId, - PortIdentity() - ).get - terminalOpId -> DocumentFactory - .openDocument(uri) - ._1 - .asInstanceOf[VirtualDocument[Tuple]] - .get() - .toList - }) - .toMap - completion.setDone() - } - }) - Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ())) - Await.result(completion, Duration.fromMinutes(1)) - results - } + def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] = + runWorkflowAndReadTerminalResults(system, workflow) "Engine" should "execute headerlessCsv workflow normally" in { val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc() diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala index 9021765fc8..2a87fe3490 100644 --- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala @@ -19,18 +19,19 @@ package org.apache.texera.amber.engine.e2e -import com.twitter.util.{Await, Duration, Promise, Return} +import com.twitter.util.{Await, Duration, Promise, Return, Throw, Try} import org.apache.pekko.actor.ActorSystem import org.apache.texera.common.config.StorageConfig import org.apache.texera.amber.core.executor.OpExecInitInfo import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.model.VirtualDocument import org.apache.texera.amber.core.tuple.Tuple -import org.apache.texera.amber.core.virtualidentity.OperatorIdentity +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, OperatorIdentity} import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext} import org.apache.texera.amber.engine.architecture.controller.{ ControllerConfig, ExecutionStateUpdate, + FatalError, Workflow } import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ @@ -78,6 +79,100 @@ object TestUtils { ) } + /** + * Resolve and read each operator's external RESULT document at `executionId`, + * applying `extract` to the opened document. Operators with no external + * RESULT uri (e.g. one whose output wasn't materialized) are omitted. Shared + * by the e2e specs so the lookup-open-extract block doesn't drift between + * copies. + */ + def readMaterializedResults[T]( + executionId: ExecutionIdentity, + operatorIds: Iterable[OperatorIdentity], + extract: VirtualDocument[Tuple] => T + ): Map[OperatorIdentity, T] = + operatorIds.flatMap { opId => + getResultUriByLogicalPortId(executionId, opId, PortIdentity()).map { uri => + opId -> extract( + DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]] + ) + } + }.toMap + + /** + * Convenience over `readMaterializedResults` for the common case: read each + * terminal operator's result of `workflow` as a `List[Tuple]`. + */ + def readMaterializedResults(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] = + readMaterializedResults( + workflow.context.executionId, + workflow.logicalPlan.getTerminalOperatorIds, + _.get().toList + ) + + /** + * Run `workflow` to COMPLETED, then read the requested operators' materialized + * results via `readMaterializedResults`. A FatalError aborts the run and is + * surfaced as the exception from the completion await. Specs that drive the + * run differently (e.g. a pause/resume flow) read results directly inside + * their own completion callback instead. + */ + def runWorkflowAndReadResults[T]( + system: ActorSystem, + workflow: Workflow, + operatorIds: Iterable[OperatorIdentity], + extract: VirtualDocument[Tuple] => T, + completionTimeout: Duration = Duration.fromMinutes(1) + ): Map[OperatorIdentity, T] = { + // The Promise carries the result so completing the run and handing back the + // value are atomic. Every terminal path uses `updateIfEmpty`, so a second + // event (a late FatalError after COMPLETED, or a repeated state update) + // can't throw inside a callback and get swallowed -- which would otherwise + // mask the real failure as a timeout. A read failure inside the COMPLETED + // callback fails the Promise (via `Try`) instead of hanging, and + // `shutdown()` runs in a `finally` so a timeout or error can't leak the + // client's actors. + val completion = Promise[Map[OperatorIdentity, T]]() + val client = new AmberClient( + system, + workflow.context, + workflow.physicalPlan, + ControllerConfig.default, + e => completion.updateIfEmpty(Throw(e)) + ) + try { + client.registerCallback[FatalError](evt => completion.updateIfEmpty(Throw(evt.e))) + client.registerCallback[ExecutionStateUpdate](evt => { + if (evt.state == COMPLETED) { + completion.updateIfEmpty( + Try(readMaterializedResults(workflow.context.executionId, operatorIds, extract)) + ) + } + }) + Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ())) + Await.result(completion, completionTimeout) + } finally { + client.shutdown() + } + } + + /** + * Convenience over `runWorkflowAndReadResults` for the common case: run + * `workflow` and read each terminal operator's result as a `List[Tuple]`. + */ + def runWorkflowAndReadTerminalResults( + system: ActorSystem, + workflow: Workflow, + completionTimeout: Duration = Duration.fromMinutes(1) + ): Map[OperatorIdentity, List[Tuple]] = + runWorkflowAndReadResults( + system, + workflow, + workflow.logicalPlan.getTerminalOperatorIds, + _.get().toList, + completionTimeout + ) + /** * If a test case accesses the user system through singleton resources that cache the DSLContext (e.g., executes a * workflow, which accesses WorkflowExecutionsResource), we use a separate texera_db specifically for such test cases. @@ -188,29 +283,7 @@ object TestUtils { var result: Map[OperatorIdentity, List[Tuple]] = null client.registerCallback[ExecutionStateUpdate](evt => { if (evt.state == COMPLETED) { - result = workflow.logicalPlan.getTerminalOperatorIds - .filter(terminalOpId => { - val uri = getResultUriByLogicalPortId( - workflow.context.executionId, - terminalOpId, - PortIdentity() - ) - uri.nonEmpty - }) - .map(terminalOpId => { - val uri = getResultUriByLogicalPortId( - workflow.context.executionId, - terminalOpId, - PortIdentity() - ).get - terminalOpId -> DocumentFactory - .openDocument(uri) - ._1 - .asInstanceOf[VirtualDocument[Tuple]] - .get() - .toList - }) - .toMap + result = readMaterializedResults(workflow) completion.setDone() } })
