This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5387-6dd16b5de8901de99cd0b96ab3d964a1348e8488 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 76f116f5e0f7b29c2e6b18cdf1688e448d2ddee3 Author: Matthew B. <[email protected]> AuthorDate: Fri Jun 5 17:38:38 2026 -0700 test(amber): add AmberKryoInitializer spec (#5387) ### What changes were proposed in this PR? - Add `AmberKryoInitializerSpec`, the first test for the Kryo closure-serialization setup that lets Pekko move Scala closures between workers. - Assert `preInit` registers `SerializedLambda` and binds `ClosureSerializer.Closure` to a `ClosureSerializer`, with a negative control proving a bare `ScalaKryo` has neither. - Add behavioral round-trips: a Scala closure serializes and deserializes back to a working function, and two closures capturing different locals stay independent. ### Any related issues, documentation, or discussions? Closes: #5388 ### How was this PR tested? - Run `sbt "WorkflowExecutionService/testOnly *AmberKryoInitializerSpec"` and expect 4 examples to pass. - Local sbt test runs are blocked by JaCoCo 0.8.11 (cannot instrument Java 25 bytecode, major version 69), so the suite was run directly via ScalaTest off the dependency classpath (`java -cp ... org.scalatest.tools.Runner -s ...AmberKryoInitializerSpec` with `--add-opens java.base/java.lang.invoke=ALL-UNNAMED`); all 4 passed. CI runs it on a supported JDK. ### Was this PR authored or co-authored using generative AI tooling? Co-authored with Claude Opus 4.8 in compliance with ASF --- .../engine/common/AmberKryoInitializerSpec.scala | 114 +++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/common/AmberKryoInitializerSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/common/AmberKryoInitializerSpec.scala new file mode 100644 index 0000000000..701e7dc8da --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/common/AmberKryoInitializerSpec.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.common + +import com.esotericsoftware.kryo.kryo5.io.{Input, Output} +import com.esotericsoftware.kryo.kryo5.serializers.ClosureSerializer +import com.esotericsoftware.kryo.kryo5.util.{DefaultClassResolver, MapReferenceResolver} +import io.altoo.serialization.kryo.scala.serializer.ScalaKryo +import org.scalatest.flatspec.AnyFlatSpec + +import java.lang.invoke.SerializedLambda + +class AmberKryoInitializerSpec extends AnyFlatSpec { + + /** A bare ScalaKryo with none of AmberKryoInitializer's registrations applied. */ + private def bareKryo(): ScalaKryo = + new ScalaKryo(new DefaultClassResolver(), new MapReferenceResolver()) + + /** A ScalaKryo configured exactly as production does, via preInit. */ + private def initializedKryo(): ScalaKryo = { + val kryo = bareKryo() + // preInit needs neither the actor system nor super.preInit, so a plain + // instance is enough to reproduce the production registration. + new AmberKryoInitializer().preInit(kryo) + kryo + } + + /** + * A kryo for round-trip checks. ClosureSerializer writes the closure's + * capturing class; production registers such classes through its full + * init/config chain, which is out of scope here, so we drop the + * registration requirement to isolate the closure path preInit enables. + */ + private def closureKryo(): ScalaKryo = { + val kryo = initializedKryo() + kryo.setRegistrationRequired(false) + kryo + } + + private def roundTrip(kryo: ScalaKryo, value: AnyRef): AnyRef = { + val output = new Output(1024) + kryo.writeClassAndObject(output, value) + output.close() + val input = new Input(output.toBytes) + val restored = kryo.readClassAndObject(input) + input.close() + restored + } + + "AmberKryoInitializer.preInit" should "register SerializedLambda and the closure serializer" in { + val kryo = initializedKryo() + + // getClassResolver.getRegistration is a pure lookup: it returns null for + // classes that were never explicitly registered (unlike Kryo.getRegistration, + // which implicitly registers on miss). + val lambdaReg = kryo.getClassResolver.getRegistration(classOf[SerializedLambda]) + assert(lambdaReg != null, "SerializedLambda must be registered") + + val closureReg = kryo.getClassResolver.getRegistration(classOf[ClosureSerializer.Closure]) + assert(closureReg != null, "ClosureSerializer.Closure must be registered") + assert( + closureReg.getSerializer.isInstanceOf[ClosureSerializer], + "the closure class must be bound to a ClosureSerializer" + ) + } + + it should "not register those classes on a kryo it never touched" in { + // Guards against the assertions above passing for some unrelated default + // registration: a bare ScalaKryo knows nothing about lambdas. + val kryo = bareKryo() + assert(kryo.getClassResolver.getRegistration(classOf[SerializedLambda]) == null) + assert(kryo.getClassResolver.getRegistration(classOf[ClosureSerializer.Closure]) == null) + } + + "A kryo configured by AmberKryoInitializer" should "round-trip a Scala closure" in { + // Scala 2.13 compiles lambdas as serializable invokedynamic closures, so + // Kryo routes them through the ClosureSerializer registered in preInit. + val addend = 41 + val fn: Int => Int = (x: Int) => x + addend + + val restored = roundTrip(closureKryo(), fn).asInstanceOf[Int => Int] + assert(restored(1) == 42, "the deserialized closure must behave like the original") + } + + it should "preserve distinct captured state across separate closures" in { + // The captured value travels inside the SerializedLambda, so two closures + // built the same way but capturing different values must stay independent. + val kryo = closureKryo() + val tenX: Int => Int = { val f = 10; (x: Int) => x * f } + val hundredX: Int => Int = { val f = 100; (x: Int) => x * f } + + val restoredTen = roundTrip(kryo, tenX).asInstanceOf[Int => Int] + val restoredHundred = roundTrip(kryo, hundredX).asInstanceOf[Int => Int] + assert(restoredTen(3) == 30) + assert(restoredHundred(3) == 300) + } +}
