andygrove opened a new pull request, #4320: URL: https://github.com/apache/datafusion-comet/pull/4320
## Which issue does this PR close? Closes #4174. ## Rationale for this change `CometUdfBridge.evaluate` dispatches JVM `CometUDF` implementations from native execution. Today it imports inputs, exports outputs, and lets the user UDF allocate its own result vector from the process-wide `CometArrowAllocator`, which is a `RootAllocator(Long.MaxValue)` that is not registered with Spark's `TaskMemoryManager`. Off-heap memory consumed by the UDF dispatch path is therefore invisible to Spark's task memory accounting and back-pressure machinery, and many concurrent JVM-UDF tasks per executor can drive native off-heap usage past the operator-level limits Spark would otherwise enforce. ## What changes are included in this PR? - New `org.apache.comet.udf.CometUdfAllocator` provides a per-Spark-task Arrow `BufferAllocator` that is a child of `CometArrowAllocator` with a custom `AllocationListener` attached. The listener forwards pre-allocation to `TaskMemoryManager.acquireExecutionMemory` (releasing partial acquisitions and throwing `OutOfMemoryError` on shortfall) and on-release to `releaseExecutionMemory`, backed by a non-spillable `MemoryConsumer` in `OFF_HEAP` mode. The child allocator is lazily created on the first `acquire(TaskContext)` call and cached on a `ConcurrentHashMap` keyed by `taskAttemptId`. A `TaskCompletionListener` closes the child and removes the cache entry when the task ends. - `CometUdfBridge` resolves the allocator once per call. With a non-null `TaskContext` it returns the per-task allocator and uses the same allocator for input import, the user UDF, and output export. With no `TaskContext` it falls back to the root allocator and logs a one-time warning that UDF off-heap memory will not be charged to Spark. - The `CometUDF` SPI now takes the `BufferAllocator` as the first parameter so user UDFs allocate their result vectors from the accounted allocator. There are zero in-repo `CometUDF` implementations on `main`, so the only caller updated in this PR is the bridge. - `CometTaskContextShim` gains a `taskMemoryManager(TaskContext)` forwarder so `CometUdfAllocator` can reach the `private[spark]` accessor without itself living under `org.apache.spark.*`. ## How are these changes tested? New `CometUdfAllocatorSuite` runs inside a real local-mode `SparkSession` (with `spark.memory.offHeap.enabled=true`) and covers two cases: - `acquire registers a MemoryConsumer that charges the task`: captures `TaskMemoryManager.getMemoryConsumptionForThisTask` before, during, and after an `IntVector` allocation on the per-task allocator and asserts it grows while the buffers are held and returns to baseline after they close. - `child allocator is closed and uncached on task completion`: confirms `CometUdfAllocator.cacheSize()` is non-zero inside the task and zero on the driver after `foreachPartition` returns, exercising the `TaskCompletionListener` cleanup path. Task-side state is propagated to the driver via `LongAccumulator` so the assertions execute on the driver after the task completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
