This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d7ce6ef55e5f [SPARK-51866][CONNECT][TESTS] Ensure 
`serializerAllocator/deserializerAllocator` are closed if 
`ArrowEncoderSuite#roundTripWithDifferentIOEncoders` fails to create 
`CloseableIterator`
d7ce6ef55e5f is described below

commit d7ce6ef55e5fc42a1a852710bff59a842bb767f6
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Wed Apr 23 10:48:20 2025 +0800

    [SPARK-51866][CONNECT][TESTS] Ensure 
`serializerAllocator/deserializerAllocator` are closed if 
`ArrowEncoderSuite#roundTripWithDifferentIOEncoders` fails to create 
`CloseableIterator`
    
    ### What changes were proposed in this pull request?
    This pull request ensures that `serializerAllocator` and 
`deserializerAllocator` are closed when the creation of `CloseableIterator` by 
`ArrowEncoderSuite#roundTripWithDifferentIOEncoders` fails.
    
    ### Why are the changes needed?
    When adding the test options `(Test / javaOptions) += 
"-Darrow.memory.debug.allocator=true",` for the `connect-client-jvm` module, 
`ArrowEncoderSuite` will throw the following error:
    
    ```
    [info] org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite *** 
ABORTED *** (3 seconds, 446 milliseconds)
    [info]   java.lang.IllegalStateException: Allocator[ROOT] closed with 
outstanding child allocators.
    [info] Allocator(ROOT) 0/0/574720/9223372036854775807 
(res/actual/peak/limit)
    [info]   child allocators: 2
    [info]     Allocator(serialization) 0/0/0/9223372036854775807 
(res/actual/peak/limit)
    [info]       child allocators: 0
    [info]       ledgers: 0
    [info]       reservations: 0
    [info]     Allocator(deserialization) 0/0/0/9223372036854775807 
(res/actual/peak/limit)
    [info]       child allocators: 0
    [info]       ledgers: 0
    [info]       reservations: 0
    [info]   ledgers: 0
    [info]   reservations: 0
    [info]   at 
org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:462)
    [info]   at 
org.apache.arrow.memory.RootAllocator.close(RootAllocator.java:27)
    [info]   at 
org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite.afterAll(ArrowEncoderSuite.scala:62)
    [info]   at 
org.scalatest.BeforeAndAfterAll.$anonfun$run$1(BeforeAndAfterAll.scala:225)
    [info]   at 
org.scalatest.Status.$anonfun$withAfterEffect$1(Status.scala:377)
    [info]   at 
org.scalatest.Status.$anonfun$withAfterEffect$1$adapted(Status.scala:373)
    [info]   at org.scalatest.CompositeStatus.whenCompleted(Status.scala:962)
    [info]   at org.scalatest.Status.withAfterEffect(Status.scala:373)
    [info]   at org.scalatest.Status.withAfterEffect$(Status.scala:371)
    [info]   at org.scalatest.CompositeStatus.withAfterEffect(Status.scala:863)
    [info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:224)
    [info]   at 
org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
    [info]   at 
org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite.run(ArrowEncoderSuite.scala:53)
    [info]   at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
    [info]   at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
    [info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
    [info]   at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    [info]   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    [info]   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    [info]   at java.base/java.lang.Thread.run(Thread.java:840)
    [info] Run completed in 5 seconds, 568 milliseconds.
    [info] Total number of tests run: 108
    [info] Suites: completed 0, aborted 1
    [info] Tests: succeeded 108, failed 0, canceled 0, ignored 0, pending 0
    [info] *** 1 SUITE ABORTED ***
    [error] Error during tests:
    [error]         org.apache.spark.sql.connect.client.arrow.ArrowEncoderSuite
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    - Pass GitHub Actions
    - locally confirmed that when adding the test parameter `(Test / 
javaOptions) += "-Darrow.memory.debug.allocator=true",` for the 
`connect-client-jvm` module, the aforementioned error message is no longer 
thrown.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #50664 from LuciferYang/Fix-ArrowEncoderSuite.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .../connect/client/arrow/ArrowEncoderSuite.scala   | 67 ++++++++++++----------
 1 file changed, 38 insertions(+), 29 deletions(-)

diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala
index 58e19389cae2..75816a835aaa 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/arrow/ArrowEncoderSuite.scala
@@ -99,40 +99,49 @@ class ArrowEncoderSuite extends ConnectFunSuite with 
BeforeAndAfterAll {
     val serializerAllocator = newAllocator("serialization")
     val deserializerAllocator = newAllocator("deserialization")
 
-    val arrowIterator = ArrowSerializer.serialize(
-      input = iterator,
-      enc = inputEncoder,
-      allocator = serializerAllocator,
-      maxRecordsPerBatch = maxRecordsPerBatch,
-      maxBatchSize = maxBatchSize,
-      batchSizeCheckInterval = batchSizeCheckInterval,
-      timeZoneId = "UTC",
-      largeVarTypes = false)
+    try {
+      val arrowIterator = ArrowSerializer.serialize(
+        input = iterator,
+        enc = inputEncoder,
+        allocator = serializerAllocator,
+        maxRecordsPerBatch = maxRecordsPerBatch,
+        maxBatchSize = maxBatchSize,
+        batchSizeCheckInterval = batchSizeCheckInterval,
+        timeZoneId = "UTC",
+        largeVarTypes = false)
 
-    val inspectedIterator = if (inspectBatch != null) {
-      arrowIterator.map { batch =>
-        inspectBatch(batch)
-        batch
+      val inspectedIterator = if (inspectBatch != null) {
+        arrowIterator.map { batch =>
+          inspectBatch(batch)
+          batch
+        }
+      } else {
+        arrowIterator
       }
-    } else {
-      arrowIterator
-    }
 
-    val resultIterator =
-      ArrowDeserializers.deserializeFromArrow(
-        inspectedIterator,
-        outputEncoder,
-        deserializerAllocator,
-        timeZoneId = "UTC")
-    new CloseableIterator[O] {
-      override def close(): Unit = {
-        arrowIterator.close()
-        resultIterator.close()
+      val resultIterator =
+        ArrowDeserializers.deserializeFromArrow(
+          inspectedIterator,
+          outputEncoder,
+          deserializerAllocator,
+          timeZoneId = "UTC")
+      new CloseableIterator[O] {
+        override def close(): Unit = {
+          arrowIterator.close()
+          resultIterator.close()
+          serializerAllocator.close()
+          deserializerAllocator.close()
+        }
+
+        override def hasNext: Boolean = resultIterator.hasNext
+
+        override def next(): O = resultIterator.next()
+      }
+    } catch {
+      case e: Throwable =>
         serializerAllocator.close()
         deserializerAllocator.close()
-      }
-      override def hasNext: Boolean = resultIterator.hasNext
-      override def next(): O = resultIterator.next()
+        throw e
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to