This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5554-2b9add956c9e63c3c4f6e717221a0c5e33e54875 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 2c304759da7dcbf79c030f6b203597ca4c17b6ac Author: Xinyuan Lin <[email protected]> AuthorDate: Mon Jun 15 15:43:11 2026 -0700 test(amber): add unit test coverage for EmptyReplayLogger and ReplayLogGenerator (#5554) ### What changes were proposed in this PR? Pin behavior of two previously-uncovered modules in `engine/architecture/logreplay`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `EmptyReplayLoggerSpec` | `EmptyReplayLogger` | 9 | | `ReplayLogGeneratorSpec` | `ReplayLogGenerator` | 10 | Both spec files follow the `<srcClassName>Spec.scala` one-to-one convention. **Behavior pinned — `EmptyReplayLogger`** | Surface | Contract | | --- | --- | | `drainCurrentLogRecords(step)` | returns an empty `Array[ReplayLogRecord]` regardless of `step` (positive, zero, `Long.MaxValue`, negative); returns a non-null array; element type is `ReplayLogRecord` (compile-time enforced) | | `markAsReplayDestination` | no-op for any `EmbeddedControlMessageIdentity`; does not accumulate state | | `logCurrentStepWithMessage` | no-op for any `(step, channelId, msg)` triple, including `msg = None` and 100 successive calls | | Trait conformance | usable through the `ReplayLogger` interface | **Behavior pinned — `ReplayLogGenerator.generate`** | Surface | Contract | | --- | --- | | `getStorage(None)` (EmptyRecordStorage) | returns `(empty queue, empty queue)` | | empty `VFSRecordStorage` file | returns `(empty queue, empty queue)` | | only `ProcessingStep` records | enqueues all into the steps queue, preserving insertion order | | only `MessageContent` records | enqueues all into the messages queue, preserving insertion order | | interleaved steps + messages | partitions correctly by type, preserving per-type order | | `ReplayDestination(id)` matching `replayTo` | short-circuits — records after the cut are NOT enqueued | | `ReplayDestination(id)` NOT matching `replayTo` | is silently skipped, iteration continues | | multiple matching `ReplayDestination` | stops at the FIRST one | | unknown record type (e.g. `TerminateSignal`) | throws `RuntimeException` with a diagnostic message | **Notes** While writing `ReplayLogGeneratorSpec` I discovered a **production stream-leak** in `ReplayLogGenerator.generate`: when it hits the matching `ReplayDestination` it short-circuits via a non-local `return`, leaving the `SequentialRecordReader`'s underlying `Input` stream open. On Windows the leaked file handle blocks subsequent temp-dir deletion. The spec's `cleanup` helper tolerates the resulting `FileSystemException` so the case still passes; the real fix is at the source and is out of scope for a test-coverage PR (will file a follow-up issue). `ReplayLogGeneratorSpec` uses a temp-dir-backed `VFSRecordStorage[ReplayLogRecord]` and the production `AmberRuntime.serde` (suite-local `ActorSystem` injected via reflection, torn down in `afterAll`) — same harness pattern as `CheckpointSubsystemSpec` / `ClientEventSpec` / `VFSRecordStorageSpec`. ### Any related issues, documentation, discussions? Closes #5550. ### How was this PR tested? Pure unit-test additions; verified locally with: - `sbt "WorkflowExecutionService/testOnly org.apache.texera.amber.engine.architecture.logreplay.EmptyReplayLoggerSpec org.apache.texera.amber.engine.architecture.logreplay.ReplayLogGeneratorSpec"` — 19 tests, all green - `sbt scalafmtCheckAll` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Sonnet 4.5) --- .../logreplay/EmptyReplayLoggerSpec.scala | 130 ++++++++ .../logreplay/ReplayLogGeneratorSpec.scala | 361 +++++++++++++++++++++ 2 files changed, 491 insertions(+) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/EmptyReplayLoggerSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/EmptyReplayLoggerSpec.scala new file mode 100644 index 0000000000..ccd7fde8f9 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/EmptyReplayLoggerSpec.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.logreplay + +import org.apache.texera.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ChannelIdentity, + EmbeddedControlMessageIdentity +} +import org.scalatest.flatspec.AnyFlatSpec + +class EmptyReplayLoggerSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // Fixtures + // --------------------------------------------------------------------------- + + private val channelId: ChannelIdentity = + ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"), isControl = false) + private val ecmId: EmbeddedControlMessageIdentity = EmbeddedControlMessageIdentity("test-ecm") + + // --------------------------------------------------------------------------- + // drainCurrentLogRecords — always empty + // --------------------------------------------------------------------------- + + "EmptyReplayLogger.drainCurrentLogRecords" should + "return an empty Array[ReplayLogRecord] regardless of the step argument" in { + val logger = new EmptyReplayLogger + val r0 = logger.drainCurrentLogRecords(0L) + val r1 = logger.drainCurrentLogRecords(1L) + val rMax = logger.drainCurrentLogRecords(Long.MaxValue) + val rNeg = logger.drainCurrentLogRecords(-1L) + assert(r0.isEmpty) + assert(r1.isEmpty) + assert(rMax.isEmpty) + assert(rNeg.isEmpty) + } + + it should "return a non-null array (callers iterate it without null-checking)" in { + val logger = new EmptyReplayLogger + val r = logger.drainCurrentLogRecords(42L) + assert(r != null) + assert(r.length == 0) + } + + it should "return arrays whose element type is ReplayLogRecord (compile-time enforced)" in { + // If a future refactor accidentally widened the return type to + // `Array[AnyRef]`, this would fail to typecheck. Pin the contract. + val logger = new EmptyReplayLogger + val r: Array[ReplayLogRecord] = logger.drainCurrentLogRecords(0L) + assert(r.length == 0) + } + + // --------------------------------------------------------------------------- + // markAsReplayDestination — no-op + // --------------------------------------------------------------------------- + + "EmptyReplayLogger.markAsReplayDestination" should + "accept any EmbeddedControlMessageIdentity without throwing" in { + val logger = new EmptyReplayLogger + logger.markAsReplayDestination(ecmId) // must not throw + // Calling twice with the same id is still a no-op. + logger.markAsReplayDestination(ecmId) + succeed + } + + it should "leave drainCurrentLogRecords output untouched (no internal buffer accumulates)" in { + val logger = new EmptyReplayLogger + logger.markAsReplayDestination(ecmId) + logger.markAsReplayDestination(EmbeddedControlMessageIdentity("another")) + assert(logger.drainCurrentLogRecords(0L).isEmpty) + } + + // --------------------------------------------------------------------------- + // logCurrentStepWithMessage — no-op + // --------------------------------------------------------------------------- + + "EmptyReplayLogger.logCurrentStepWithMessage" should + "accept any (step, channelId, msg) triple without throwing" in { + val logger = new EmptyReplayLogger + logger.logCurrentStepWithMessage(0L, channelId, msg = None) + logger.logCurrentStepWithMessage(1L, channelId, msg = None) + logger.logCurrentStepWithMessage(Long.MaxValue, channelId, msg = None) + succeed + } + + it should "tolerate a None msg argument (the null-object's job is to absorb every call)" in { + val logger = new EmptyReplayLogger + logger.logCurrentStepWithMessage(7L, channelId, msg = None) + // Verify nothing was queued in the process. + assert(logger.drainCurrentLogRecords(7L).isEmpty) + } + + it should "leave drainCurrentLogRecords output empty even after many calls" in { + val logger = new EmptyReplayLogger + (1L to 100L).foreach(i => logger.logCurrentStepWithMessage(i, channelId, msg = None)) + assert(logger.drainCurrentLogRecords(100L).isEmpty) + } + + // --------------------------------------------------------------------------- + // ReplayLogger trait conformance + // --------------------------------------------------------------------------- + // + // The null-object pattern requires EmptyReplayLogger to be a drop-in for + // ReplayLogger callers — pin the upcast. + + "EmptyReplayLogger" should "be usable through the ReplayLogger interface" in { + val logger: ReplayLogger = new EmptyReplayLogger + logger.logCurrentStepWithMessage(0L, channelId, msg = None) + logger.markAsReplayDestination(ecmId) + assert(logger.drainCurrentLogRecords(0L).isEmpty) + } +} diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala new file mode 100644 index 0000000000..e702f6ca5f --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/logreplay/ReplayLogGeneratorSpec.scala @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.logreplay + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.serialization.{Serialization, SerializationExtension} +import org.apache.pekko.testkit.TestKit +import org.apache.texera.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ChannelIdentity, + EmbeddedControlMessageIdentity +} +import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.common.storage.{SequentialRecordStorage, VFSRecordStorage} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.flatspec.AnyFlatSpec + +import java.nio.file.{Files, Path} + +class ReplayLogGeneratorSpec extends AnyFlatSpec with BeforeAndAfterAll { + + // --------------------------------------------------------------------------- + // Suite-local Pekko serde injected into AmberRuntime via reflection + // --------------------------------------------------------------------------- + // + // `SequentialRecordWriter.writeRecord` hard-codes `AmberRuntime.serde`, + // so any test that round-trips records through VFSRecordStorage needs + // AmberRuntime initialized. Pattern matches CheckpointSubsystemSpec / + // ClientEventSpec — own a suite-local ActorSystem, inject it into + // AmberRuntime's private vars via reflection, tear down in afterAll. + + private val testSystem: ActorSystem = + ActorSystem("ReplayLogGeneratorSpec-test", AmberRuntime.pekkoConfig) + private val testSerde: Serialization = SerializationExtension(testSystem) + + private def getAmberRuntimeField(name: String): AnyRef = { + val field = AmberRuntime.getClass.getDeclaredField(name) + field.setAccessible(true) + field.get(AmberRuntime) + } + + private def setAmberRuntimeField(name: String, value: AnyRef): Unit = { + val field = AmberRuntime.getClass.getDeclaredField(name) + field.setAccessible(true) + field.set(AmberRuntime, value) + } + + // Capture whatever AmberRuntime held before we overwrite it so afterAll can + // restore it. Unconditionally nulling the fields would clobber an already + // initialized AmberRuntime and couple this suite to test execution order. + private var prevActorSystem: AnyRef = _ + private var prevSerde: AnyRef = _ + + override protected def beforeAll(): Unit = { + super.beforeAll() + prevActorSystem = getAmberRuntimeField("_actorSystem") + prevSerde = getAmberRuntimeField("_serde") + setAmberRuntimeField("_actorSystem", testSystem) + setAmberRuntimeField("_serde", testSerde) + } + + override protected def afterAll(): Unit = { + setAmberRuntimeField("_serde", prevSerde) + setAmberRuntimeField("_actorSystem", prevActorSystem) + TestKit.shutdownActorSystem(testSystem) + super.afterAll() + } + + private val isWindows: Boolean = + System.getProperty("os.name", "").toLowerCase.contains("win") + + // Best-effort temp-dir cleanup. `Files.walk` returns a closeable Stream + // backed by an open directory handle — wrap in try/finally so the + // handle is released even if traversal throws. + // + // On Windows we tolerate `FileSystemException` on `deleteIfExists` because + // `ReplayLogGenerator.generate` short-circuits at `ReplayDestination` + // via a non-local `return`, which leaks the underlying + // `SequentialRecordReader.Input` stream — and a leaked open file handle + // blocks the temp file from being deleted there. That is a production bug + // to fix separately; in-test we just let the OS reap the temp files later + // instead of failing the case. On other platforms an open handle does not + // block deletion, so a `FileSystemException` signals a real problem and is + // allowed to propagate. + private def cleanup(sub: Path): Unit = { + val root = sub.getParent + if (root == null || !Files.exists(root)) return + val stream = Files.walk(root) + try { + stream + .sorted(java.util.Comparator.reverseOrder()) + .forEach { child => + try Files.deleteIfExists(child) + catch { case _: java.nio.file.FileSystemException if isWindows => () } + } + } finally { + stream.close() + } + } + + // --------------------------------------------------------------------------- + // Fixtures + // --------------------------------------------------------------------------- + + private val cid: ChannelIdentity = + ChannelIdentity(ActorVirtualIdentity("from"), ActorVirtualIdentity("to"), isControl = false) + private val destA: EmbeddedControlMessageIdentity = EmbeddedControlMessageIdentity("dest-A") + private val destB: EmbeddedControlMessageIdentity = EmbeddedControlMessageIdentity("dest-B") + + private def newStorage(): (Path, SequentialRecordStorage[ReplayLogRecord]) = { + val root = Files.createTempDirectory("replay-log-generator-spec-") + val sub = root.resolve("logs") + val storage = new VFSRecordStorage[ReplayLogRecord](sub.toUri) + (sub, storage) + } + + private def writeLog( + storage: SequentialRecordStorage[ReplayLogRecord], + records: Seq[ReplayLogRecord] + ): Unit = { + val writer = storage.getWriter("log") + // Close in a finally so a serialization failure mid-write does not leak + // the underlying output stream (which would otherwise block temp-dir + // cleanup, especially on Windows). + try { + records.foreach(writer.writeRecord) + writer.flush() + } finally { + writer.close() + } + } + + private def msg(seq: Long): WorkflowFIFOMessage = + WorkflowFIFOMessage(cid, seq, DataFrame(Array.empty)) + + // --------------------------------------------------------------------------- + // Empty storage + // --------------------------------------------------------------------------- + + "ReplayLogGenerator.generate" should + "return empty queues when the storage is an EmptyRecordStorage" in { + val storage = SequentialRecordStorage.getStorage[ReplayLogRecord](None) + val (steps, messages) = ReplayLogGenerator.generate(storage, "log", destA) + assert(steps.isEmpty) + assert(messages.isEmpty) + } + + it should "return empty queues when the storage file exists but holds no records" in { + val (sub, storage) = newStorage() + try { + writeLog(storage, Seq.empty) + val (steps, messages) = ReplayLogGenerator.generate(storage, "log", destA) + assert(steps.isEmpty) + assert(messages.isEmpty) + } finally { + cleanup(sub) + } + } + + // --------------------------------------------------------------------------- + // Partitioning by record type + // --------------------------------------------------------------------------- + + it should "enqueue all ProcessingStep records into the steps queue (preserving order)" in { + val (sub, storage) = newStorage() + try { + val recs = Seq( + ProcessingStep(cid, 1L), + ProcessingStep(cid, 2L), + ProcessingStep(cid, 3L) + ) + writeLog(storage, recs) + val (steps, messages) = ReplayLogGenerator.generate(storage, "log", destA) + assert(steps.toList == recs) + assert(messages.isEmpty) + } finally { + cleanup(sub) + } + } + + it should "enqueue all MessageContent records into the messages queue (preserving order)" in { + val (sub, storage) = newStorage() + try { + val m1 = msg(1L) + val m2 = msg(2L) + writeLog(storage, Seq(MessageContent(m1), MessageContent(m2))) + val (steps, messages) = ReplayLogGenerator.generate(storage, "log", destA) + assert(steps.isEmpty) + // The pair is round-tripped through Kryo; the message reference is + // not preserved (a fresh deserialized copy comes back) — so compare + // by case-class equality, not `eq`. + assert(messages.toList == List(m1, m2)) + } finally { + cleanup(sub) + } + } + + it should + "partition steps and messages independently when records are interleaved" in { + val (sub, storage) = newStorage() + try { + val s1 = ProcessingStep(cid, 1L) + val s2 = ProcessingStep(cid, 2L) + val m1 = msg(1L) + val m2 = msg(2L) + writeLog( + storage, + Seq(s1, MessageContent(m1), s2, MessageContent(m2)) + ) + val (steps, messages) = ReplayLogGenerator.generate(storage, "log", destA) + assert(steps.toList == List(s1, s2)) + assert(messages.toList == List(m1, m2)) + } finally { + cleanup(sub) + } + } + + // --------------------------------------------------------------------------- + // ReplayDestination — early termination + skip semantics + // --------------------------------------------------------------------------- + + it should + "short-circuit at the matching ReplayDestination, ignoring records that follow it" in { + val (sub, storage) = newStorage() + try { + val s1 = ProcessingStep(cid, 1L) + val m1 = msg(1L) + val m2Past = msg(2L) // must NOT appear in the result + val s2Past = ProcessingStep(cid, 99L) // must NOT appear either + writeLog( + storage, + Seq( + s1, + MessageContent(m1), + ReplayDestination(destA), // <-- replayTo target; iteration stops here + MessageContent(m2Past), + s2Past + ) + ) + val (steps, messages) = ReplayLogGenerator.generate(storage, "log", destA) + assert(steps.toList == List(s1)) + assert(messages.toList == List(m1)) + } finally { + cleanup(sub) + } + } + + it should + "silently skip a ReplayDestination whose id does not match replayTo (iteration continues)" in { + val (sub, storage) = newStorage() + try { + val s1 = ProcessingStep(cid, 1L) + val m1 = msg(1L) + writeLog( + storage, + Seq( + s1, + ReplayDestination(destB), // <-- different id; skipped + MessageContent(m1) + ) + ) + val (steps, messages) = ReplayLogGenerator.generate(storage, "log", destA) + assert(steps.toList == List(s1)) + assert(messages.toList == List(m1)) + } finally { + cleanup(sub) + } + } + + it should + "stop at the FIRST matching ReplayDestination when multiple matching records exist" in { + val (sub, storage) = newStorage() + try { + val s1 = ProcessingStep(cid, 1L) + val s2Past = ProcessingStep(cid, 2L) + writeLog( + storage, + Seq( + s1, + ReplayDestination(destA), // <-- first match + s2Past, // <-- after the cut + ReplayDestination(destA) + ) + ) + val (steps, _) = ReplayLogGenerator.generate(storage, "log", destA) + assert(steps.toList == List(s1)) + } finally { + cleanup(sub) + } + } + + // --------------------------------------------------------------------------- + // Unknown record type — TerminateSignal triggers the `case other` branch + // --------------------------------------------------------------------------- + + it should "throw RuntimeException on an unhandled record type (e.g. TerminateSignal)" in { + val (sub, storage) = newStorage() + try { + writeLog(storage, Seq(TerminateSignal)) + val ex = intercept[RuntimeException] { + ReplayLogGenerator.generate(storage, "log", destA) + } + assert( + ex.getMessage.toLowerCase.contains("cannot handle"), + s"expected diagnostic message about unhandled record, got: ${ex.getMessage}" + ) + } finally { + cleanup(sub) + } + } + + // --------------------------------------------------------------------------- + // Mixed-record full-cycle + // --------------------------------------------------------------------------- + + it should "handle a realistic mix of steps + messages + non-matching destinations" in { + val (sub, storage) = newStorage() + try { + val s1 = ProcessingStep(cid, 10L) + val s2 = ProcessingStep(cid, 20L) + val s3 = ProcessingStep(cid, 30L) + val m1 = msg(1L) + val m2 = msg(2L) + writeLog( + storage, + Seq( + s1, + MessageContent(m1), + ReplayDestination(destB), // skipped (id mismatch) + s2, + MessageContent(m2), + ReplayDestination(destB), // also skipped + s3 + ) + ) + val (steps, messages) = ReplayLogGenerator.generate(storage, "log", destA) + assert(steps.toList == List(s1, s2, s3)) + assert(messages.toList == List(m1, m2)) + } finally { + cleanup(sub) + } + } +}
