This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d7ce6ef55e5f [SPARK-51866][CONNECT][TESTS] Ensure `serializerAllocator/deserializerAllocator` are closed if `ArrowEncoderSuite#roundTripWithDifferentIOEncoders` fails to create `CloseableIterator` d7ce6ef55e5f is described below commit d7ce6ef55e5fc42a1a852710bff59a842bb767f6 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Wed Apr 23 10:48:20 2025 +0800 [SPARK-51866][CONNECT][TESTS] Ensure `serializerAllocator/deserializerAllocator` are closed if `ArrowEncoderSuite#roundTripWithDifferentIOEncoders` fails to create `CloseableIterator` ### What changes were proposed in this pull request? This pull request ensures that `serializerAllocator` and `deserializerAllocator` are closed when the creation of `CloseableIterator` by `ArrowEncoderSuite#roundTripWithDifferentIOEncoders` fails. ### Why are the changes needed? When adding the test options `(Test / javaOptions) += "-Darrow.memory.debug.allocator=true",` for the `connect-client-jvm` module, `ArrowEncoderSuite` will throw the following error: ``` [info] org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite *** ABORTED *** (3 seconds, 446 milliseconds) [info] java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding child allocators. [info] Allocator(ROOT) 0/0/574720/9223372036854775807 (res/actual/peak/limit) [info] child allocators: 2 [info] Allocator(serialization) 0/0/0/9223372036854775807 (res/actual/peak/limit) [info] child allocators: 0 [info] ledgers: 0 [info] reservations: 0 [info] Allocator(deserialization) 0/0/0/9223372036854775807 (res/actual/peak/limit) [info] child allocators: 0 [info] ledgers: 0 [info] reservations: 0 [info] ledgers: 0 [info] reservations: 0 [info] at org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:462) [info] at org.apache.arrow.memory.RootAllocator.close(RootAllocator.java:27) [info] at org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite.afterAll(ArrowEncoderSuite.scala:62) [info] at org.scalatest.BeforeAndAfterAll.$anonfun$run$1(BeforeAndAfterAll.scala:225) [info] at org.scalatest.Status.$anonfun$withAfterEffect$1(Status.scala:377) [info] at org.scalatest.Status.$anonfun$withAfterEffect$1$adapted(Status.scala:373) [info] at org.scalatest.CompositeStatus.whenCompleted(Status.scala:962) [info] at org.scalatest.Status.withAfterEffect(Status.scala:373) [info] at org.scalatest.Status.withAfterEffect$(Status.scala:371) [info] at org.scalatest.CompositeStatus.withAfterEffect(Status.scala:863) [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:224) [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) [info] at org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite.run(ArrowEncoderSuite.scala:53) [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321) [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517) [info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414) [info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [info] at java.base/java.lang.Thread.run(Thread.java:840) [info] Run completed in 5 seconds, 568 milliseconds. [info] Total number of tests run: 108 [info] Suites: completed 0, aborted 1 [info] Tests: succeeded 108, failed 0, canceled 0, ignored 0, pending 0 [info] *** 1 SUITE ABORTED *** [error] Error during tests: [error] org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - locally confirmed that when adding the test parameter `(Test / javaOptions) += "-Darrow.memory.debug.allocator=true",` for the `connect-client-jvm` module, the aforementioned error message is no longer thrown. ### Was this patch authored or co-authored using generative AI tooling? No Closes #50664 from LuciferYang/Fix-ArrowEncoderSuite. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../connect/client/arrow/ArrowEncoderSuite.scala | 67 ++++++++++++---------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala index 58e19389cae2..75816a835aaa 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala @@ -99,40 +99,49 @@ class ArrowEncoderSuite extends ConnectFunSuite with BeforeAndAfterAll { val serializerAllocator = newAllocator("serialization") val deserializerAllocator = newAllocator("deserialization") - val arrowIterator = ArrowSerializer.serialize( - input = iterator, - enc = inputEncoder, - allocator = serializerAllocator, - maxRecordsPerBatch = maxRecordsPerBatch, - maxBatchSize = maxBatchSize, - batchSizeCheckInterval = batchSizeCheckInterval, - timeZoneId = "UTC", - largeVarTypes = false) + try { + val arrowIterator = ArrowSerializer.serialize( + input = iterator, + enc = inputEncoder, + allocator = serializerAllocator, + maxRecordsPerBatch = maxRecordsPerBatch, + maxBatchSize = maxBatchSize, + batchSizeCheckInterval = batchSizeCheckInterval, + timeZoneId = "UTC", + largeVarTypes = false) - val inspectedIterator = if (inspectBatch != null) { - arrowIterator.map { batch => - inspectBatch(batch) - batch + val inspectedIterator = if (inspectBatch != null) { + arrowIterator.map { batch => + inspectBatch(batch) + batch + } + } else { + arrowIterator } - } else { - arrowIterator - } - val resultIterator = - ArrowDeserializers.deserializeFromArrow( - inspectedIterator, - outputEncoder, - deserializerAllocator, - timeZoneId = "UTC") - new CloseableIterator[O] { - override def close(): Unit = { - arrowIterator.close() - resultIterator.close() + val resultIterator = + ArrowDeserializers.deserializeFromArrow( + inspectedIterator, + outputEncoder, + deserializerAllocator, + timeZoneId = "UTC") + new CloseableIterator[O] { + override def close(): Unit = { + arrowIterator.close() + resultIterator.close() + serializerAllocator.close() + deserializerAllocator.close() + } + + override def hasNext: Boolean = resultIterator.hasNext + + override def next(): O = resultIterator.next() + } + } catch { + case e: Throwable => serializerAllocator.close() deserializerAllocator.close() - } - override def hasNext: Boolean = resultIterator.hasNext - override def next(): O = resultIterator.next() + throw e } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org