andygrove opened a new pull request, #4320:
URL: https://github.com/apache/datafusion-comet/pull/4320

   ## Which issue does this PR close?
   
   Closes #4174.
   
   ## Rationale for this change
   
   `CometUdfBridge.evaluate` dispatches JVM `CometUDF` implementations from 
native execution. Today it imports inputs, exports outputs, and lets the user 
UDF allocate its own result vector from the process-wide `CometArrowAllocator`, 
which is a `RootAllocator(Long.MaxValue)` that is not registered with Spark's 
`TaskMemoryManager`. Off-heap memory consumed by the UDF dispatch path is 
therefore invisible to Spark's task memory accounting and back-pressure 
machinery, and many concurrent JVM-UDF tasks per executor can drive native 
off-heap usage past the operator-level limits Spark would otherwise enforce.
   
   ## What changes are included in this PR?
   
   - New `org.apache.comet.udf.CometUdfAllocator` provides a per-Spark-task 
Arrow `BufferAllocator` that is a child of `CometArrowAllocator` with a custom 
`AllocationListener` attached. The listener forwards pre-allocation to 
`TaskMemoryManager.acquireExecutionMemory` (releasing partial acquisitions and 
throwing `OutOfMemoryError` on shortfall) and on-release to 
`releaseExecutionMemory`, backed by a non-spillable `MemoryConsumer` in 
`OFF_HEAP` mode. The child allocator is lazily created on the first 
`acquire(TaskContext)` call and cached on a `ConcurrentHashMap` keyed by 
`taskAttemptId`. A `TaskCompletionListener` closes the child and removes the 
cache entry when the task ends.
   - `CometUdfBridge` resolves the allocator once per call. With a non-null 
`TaskContext` it returns the per-task allocator and uses the same allocator for 
input import, the user UDF, and output export. With no `TaskContext` it falls 
back to the root allocator and logs a one-time warning that UDF off-heap memory 
will not be charged to Spark.
   - The `CometUDF` SPI now takes the `BufferAllocator` as the first parameter 
so user UDFs allocate their result vectors from the accounted allocator. There 
are zero in-repo `CometUDF` implementations on `main`, so the only caller 
updated in this PR is the bridge.
   - `CometTaskContextShim` gains a `taskMemoryManager(TaskContext)` forwarder 
so `CometUdfAllocator` can reach the `private[spark]` accessor without itself 
living under `org.apache.spark.*`.
   
   ## How are these changes tested?
   
   New `CometUdfAllocatorSuite` runs inside a real local-mode `SparkSession` 
(with `spark.memory.offHeap.enabled=true`) and covers two cases:
   - `acquire registers a MemoryConsumer that charges the task`: captures 
`TaskMemoryManager.getMemoryConsumptionForThisTask` before, during, and after 
an `IntVector` allocation on the per-task allocator and asserts it grows while 
the buffers are held and returns to baseline after they close.
   - `child allocator is closed and uncached on task completion`: confirms 
`CometUdfAllocator.cacheSize()` is non-zero inside the task and zero on the 
driver after `foreachPartition` returns, exercising the 
`TaskCompletionListener` cleanup path.
   
   Task-side state is propagated to the driver via `LongAccumulator` so the 
assertions execute on the driver after the task completes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to