This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new dfa0434e70 refactor(test): extract shared run-and-read e2e harness
into TestUtils (#5712)
dfa0434e70 is described below
commit dfa0434e70c256feb37a8d38c1d0bc134e789fdd
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sun Jun 14 17:58:45 2026 -0700
refactor(test): extract shared run-and-read e2e harness into TestUtils
(#5712)
### What changes were proposed in this PR?
Extracts the repeated "run a workflow and read its materialized results"
boilerplate from the amber e2e specs into two reusable helpers on
`TestUtils`:
- `readMaterializedResults(executionId, operatorIds, extract)` — resolve
+ open each operator's external RESULT document and apply `extract` to
the opened `VirtualDocument[Tuple]` (operators with no materialized
output are skipped).
- `runWorkflowAndReadResults(system, workflow, operatorIds, extract,
completionTimeout)` — run a workflow to `COMPLETED` (a `FatalError`
aborts and surfaces as the awaited exception), then read results via
`readMaterializedResults`.
`DataProcessingSpec.executeWorkflow` now calls the shared harness
instead of its own inline copy. The helpers are loop/state-agnostic —
they only use existing core APIs (`DocumentFactory`,
`VirtualDocument[Tuple]`, `AmberClient`, `ExecutionStateUpdate`,
`FatalError`), so other e2e specs can adopt them too.
### Any related issues, documentation, discussions?
Resolves #5711 (sub-issue of #4442 "Introduce for loop"). Split out of
#5700 to keep that PR reviewable, per @Xiao-zhen-Liu's
[review](https://github.com/apache/texera/pull/4206#pullrequestreview-4482667715).
### How was this PR tested?
Behavior-preserving refactor of existing e2e test infrastructure.
`WorkflowExecutionService/Test/compile` and
`WorkflowExecutionService/scalafmtCheckAll` pass locally. The
`@IntegrationTest` specs that exercise the harness (e.g.
`DataProcessingSpec`) run in CI — they spawn Python workers and can't
run on Windows.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.
---
.../amber/engine/e2e/DataProcessingSpec.scala | 58 +---------
.../apache/texera/amber/engine/e2e/TestUtils.scala | 123 ++++++++++++++++-----
2 files changed, 101 insertions(+), 80 deletions(-)
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
index d070fefb27..2606d9d656 100644
---
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
+++
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala
@@ -22,10 +22,7 @@ package org.apache.texera.amber.engine.e2e
import org.apache.pekko.actor.{ActorSystem, Props}
import org.apache.pekko.testkit.{ImplicitSender, TestKit}
import org.apache.pekko.util.Timeout
-import com.twitter.util.{Await, Duration, Promise}
import org.apache.texera.amber.clustering.SingleNodeListener
-import org.apache.texera.amber.core.storage.DocumentFactory
-import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.tuple.{AttributeType, Tuple}
import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
import org.apache.texera.amber.core.workflow.{
@@ -35,19 +32,16 @@ import org.apache.texera.amber.core.workflow.{
WorkflowSettings
}
import org.apache.texera.amber.engine.architecture.controller._
-import
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
-import
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
import org.apache.texera.amber.engine.common.AmberRuntime
-import org.apache.texera.amber.engine.common.client.AmberClient
import org.apache.texera.amber.engine.e2e.TestUtils.{
buildWorkflow,
cleanupWorkflowExecutionData,
initiateTexeraDBForTestCases,
+ runWorkflowAndReadTerminalResults,
setUpWorkflowExecutionData
}
import org.apache.texera.amber.operator.TestOperators
import org.apache.texera.amber.operator.aggregate.AggregationFunction
-import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId
import org.apache.texera.workflow.LogicalLink
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
@@ -101,54 +95,8 @@ class DataProcessingSpec
TestKit.shutdownActorSystem(system)
}
- def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]]
= {
- var results: Map[OperatorIdentity, List[Tuple]] = null
- val client = new AmberClient(
- system,
- workflow.context,
- workflow.physicalPlan,
- ControllerConfig.default,
- error => {}
- )
- val completion = Promise[Unit]()
- client.registerCallback[FatalError](evt => {
- completion.setException(evt.e)
- client.shutdown()
- })
-
- client
- .registerCallback[ExecutionStateUpdate](evt => {
- if (evt.state == COMPLETED) {
- results = workflow.logicalPlan.getTerminalOperatorIds
- .filter(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflowContext.executionId,
- terminalOpId,
- PortIdentity()
- )
- uri.nonEmpty
- })
- .map(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflowContext.executionId,
- terminalOpId,
- PortIdentity()
- ).get
- terminalOpId -> DocumentFactory
- .openDocument(uri)
- ._1
- .asInstanceOf[VirtualDocument[Tuple]]
- .get()
- .toList
- })
- .toMap
- completion.setDone()
- }
- })
- Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
- Await.result(completion, Duration.fromMinutes(1))
- results
- }
+ def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] =
+ runWorkflowAndReadTerminalResults(system, workflow)
"Engine" should "execute headerlessCsv workflow normally" in {
val headerlessCsvOpDesc = TestOperators.headerlessSmallCsvScanOpDesc()
diff --git
a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
index 9021765fc8..2a87fe3490 100644
--- a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
+++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala
@@ -19,18 +19,19 @@
package org.apache.texera.amber.engine.e2e
-import com.twitter.util.{Await, Duration, Promise, Return}
+import com.twitter.util.{Await, Duration, Promise, Return, Throw, Try}
import org.apache.pekko.actor.ActorSystem
import org.apache.texera.common.config.StorageConfig
import org.apache.texera.amber.core.executor.OpExecInitInfo
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.tuple.Tuple
-import org.apache.texera.amber.core.virtualidentity.OperatorIdentity
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
OperatorIdentity}
import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext}
import org.apache.texera.amber.engine.architecture.controller.{
ControllerConfig,
ExecutionStateUpdate,
+ FatalError,
Workflow
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
@@ -78,6 +79,100 @@ object TestUtils {
)
}
+ /**
+ * Resolve and read each operator's external RESULT document at
`executionId`,
+ * applying `extract` to the opened document. Operators with no external
+ * RESULT uri (e.g. one whose output wasn't materialized) are omitted.
Shared
+ * by the e2e specs so the lookup-open-extract block doesn't drift between
+ * copies.
+ */
+ def readMaterializedResults[T](
+ executionId: ExecutionIdentity,
+ operatorIds: Iterable[OperatorIdentity],
+ extract: VirtualDocument[Tuple] => T
+ ): Map[OperatorIdentity, T] =
+ operatorIds.flatMap { opId =>
+ getResultUriByLogicalPortId(executionId, opId, PortIdentity()).map { uri
=>
+ opId -> extract(
+
DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
+ )
+ }
+ }.toMap
+
+ /**
+ * Convenience over `readMaterializedResults` for the common case: read each
+ * terminal operator's result of `workflow` as a `List[Tuple]`.
+ */
+ def readMaterializedResults(workflow: Workflow): Map[OperatorIdentity,
List[Tuple]] =
+ readMaterializedResults(
+ workflow.context.executionId,
+ workflow.logicalPlan.getTerminalOperatorIds,
+ _.get().toList
+ )
+
+ /**
+ * Run `workflow` to COMPLETED, then read the requested operators'
materialized
+ * results via `readMaterializedResults`. A FatalError aborts the run and is
+ * surfaced as the exception from the completion await. Specs that drive the
+ * run differently (e.g. a pause/resume flow) read results directly inside
+ * their own completion callback instead.
+ */
+ def runWorkflowAndReadResults[T](
+ system: ActorSystem,
+ workflow: Workflow,
+ operatorIds: Iterable[OperatorIdentity],
+ extract: VirtualDocument[Tuple] => T,
+ completionTimeout: Duration = Duration.fromMinutes(1)
+ ): Map[OperatorIdentity, T] = {
+ // The Promise carries the result so completing the run and handing back
the
+ // value are atomic. Every terminal path uses `updateIfEmpty`, so a second
+ // event (a late FatalError after COMPLETED, or a repeated state update)
+ // can't throw inside a callback and get swallowed -- which would otherwise
+ // mask the real failure as a timeout. A read failure inside the COMPLETED
+ // callback fails the Promise (via `Try`) instead of hanging, and
+ // `shutdown()` runs in a `finally` so a timeout or error can't leak the
+ // client's actors.
+ val completion = Promise[Map[OperatorIdentity, T]]()
+ val client = new AmberClient(
+ system,
+ workflow.context,
+ workflow.physicalPlan,
+ ControllerConfig.default,
+ e => completion.updateIfEmpty(Throw(e))
+ )
+ try {
+ client.registerCallback[FatalError](evt =>
completion.updateIfEmpty(Throw(evt.e)))
+ client.registerCallback[ExecutionStateUpdate](evt => {
+ if (evt.state == COMPLETED) {
+ completion.updateIfEmpty(
+ Try(readMaterializedResults(workflow.context.executionId,
operatorIds, extract))
+ )
+ }
+ })
+ Await.result(client.controllerInterface.startWorkflow(EmptyRequest(),
()))
+ Await.result(completion, completionTimeout)
+ } finally {
+ client.shutdown()
+ }
+ }
+
+ /**
+ * Convenience over `runWorkflowAndReadResults` for the common case: run
+ * `workflow` and read each terminal operator's result as a `List[Tuple]`.
+ */
+ def runWorkflowAndReadTerminalResults(
+ system: ActorSystem,
+ workflow: Workflow,
+ completionTimeout: Duration = Duration.fromMinutes(1)
+ ): Map[OperatorIdentity, List[Tuple]] =
+ runWorkflowAndReadResults(
+ system,
+ workflow,
+ workflow.logicalPlan.getTerminalOperatorIds,
+ _.get().toList,
+ completionTimeout
+ )
+
/**
* If a test case accesses the user system through singleton resources that
cache the DSLContext (e.g., executes a
* workflow, which accesses WorkflowExecutionsResource), we use a separate
texera_db specifically for such test cases.
@@ -188,29 +283,7 @@ object TestUtils {
var result: Map[OperatorIdentity, List[Tuple]] = null
client.registerCallback[ExecutionStateUpdate](evt => {
if (evt.state == COMPLETED) {
- result = workflow.logicalPlan.getTerminalOperatorIds
- .filter(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflow.context.executionId,
- terminalOpId,
- PortIdentity()
- )
- uri.nonEmpty
- })
- .map(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflow.context.executionId,
- terminalOpId,
- PortIdentity()
- ).get
- terminalOpId -> DocumentFactory
- .openDocument(uri)
- ._1
- .asInstanceOf[VirtualDocument[Tuple]]
- .get()
- .toList
- })
- .toMap
+ result = readMaterializedResults(workflow)
completion.setDone()
}
})