avantgardnerio commented on code in PR #4582:
URL: https://github.com/apache/datafusion-comet/pull/4582#discussion_r3351290034


##########
native/core/src/execution/memory_pools/oom_guard.rs:
##########
@@ -0,0 +1,352 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::common::DataFusionError;
+use std::alloc::{GlobalAlloc, Layout};
+use std::cell::Cell;
+use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
+
+/// Per-thread drift is flushed into the shared balance once it crosses this.
+const SETTLE_THRESHOLD: isize = 64 * 1024;
+
+/// Process-wide outstanding bytes (signed so transient under-settle is fine).
+static BALANCE: AtomicIsize = AtomicIsize::new(0);
+/// Enforcement limit in bytes; 0 means unset.
+static LIMIT: AtomicUsize = AtomicUsize::new(0);
+/// Master enforcement gate (single relaxed load on the hot path).
+static ARMED: AtomicBool = AtomicBool::new(false);
+
+thread_local! {
+    /// Un-flushed per-thread delta.
+    static LOCAL_DRIFT: Cell<isize> = const { Cell::new(0) };
+    /// Is this a query-worker thread eligible for enforcement?
+    static STAMPED: Cell<bool> = const { Cell::new(false) };
+    /// Set while a guard panic is unwinding this thread, to avoid 
double-faults.
+    static UNWINDING: Cell<bool> = const { Cell::new(false) };
+}
+
+/// Payload of the panic raised when an armed, stamped thread exceeds the 
limit.
+#[derive(Debug)]
+pub struct OomGuardPanic {
+    pub balance: usize,
+    pub limit: usize,
+}
+
+/// Arm the guard with a byte limit. Idempotent.
+pub fn arm(limit_bytes: usize) {
+    LIMIT.store(limit_bytes, Ordering::Relaxed);
+    ARMED.store(true, Ordering::Relaxed);
+}
+
+/// Disarm the guard (enforcement off; tracking continues cheaply).
+#[allow(dead_code)] // used only by tests
+pub fn disarm() {
+    ARMED.store(false, Ordering::Relaxed);
+}
+
+/// Mark the current thread as a query-worker thread eligible for enforcement.
+pub fn stamp_current_thread() {
+    STAMPED.with(|s| s.set(true));
+}
+
+/// Reset the per-thread unwinding guard after a guard panic has been caught on
+/// this thread. Safe to call when not unwinding. The JNI caller thread is
+/// reused across tasks, so this must run after catching an OomGuardPanic.
+pub fn clear_unwinding() {
+    UNWINDING.with(|u| u.set(false));
+}
+
+/// If `panic` is an `OomGuardPanic`, clear this thread's unwinding guard and
+/// return the mapped retriable error. Returns `None` for any other panic.
+/// Centralizes the downcast + unwinding-reset + error mapping for all catch 
sites.
+pub fn map_panic_to_error(
+    panic: &(dyn std::any::Any + Send),
+) -> Option<DataFusionError> {
+    let g = panic.downcast_ref::<OomGuardPanic>()?;
+    clear_unwinding();
+    Some(DataFusionError::ResourcesExhausted(format!(
+        "Comet OomGuard: native allocation pushed usage to {} bytes, over the 
limit of {} \
+         bytes; failing this task",
+        g.balance, g.limit
+    )))
+}
+
+/// Current process-wide balance in bytes (never reported negative).
+#[allow(dead_code)] // used only by tests
+pub fn current_balance() -> usize {
+    BALANCE.load(Ordering::Relaxed).max(0) as usize
+}
+
+/// Record an allocation of `size` bytes; may trip the breaker.
+#[inline]
+fn record_alloc(size: usize) {
+    track(size as isize);
+}
+
+/// Record a deallocation of `size` bytes; never trips (credit only).
+#[inline]
+fn record_dealloc(size: usize) {
+    track(-(size as isize));
+}
+
+/// Core tracking + enforcement. Flushes drift; on a debit flush that crosses 
the
+/// limit on an armed, stamped, non-unwinding thread, panics with 
`OomGuardPanic`.
+#[inline]
+fn track(delta: isize) {
+    let new_balance = LOCAL_DRIFT.with(|d| {
+        let mut drift = d.get();
+        let flushed = settle(&mut drift, delta, &BALANCE);
+        d.set(drift);
+        flushed
+    });
+
+    if delta <= 0 {
+        return; // credits never enforce
+    }
+    let Some(balance) = new_balance else { return };
+    if !ARMED.load(Ordering::Relaxed) {
+        return;
+    }
+    if !STAMPED.with(|s| s.get()) {
+        return;
+    }
+    if UNWINDING.with(|u| u.get()) {
+        return;
+    }
+    let limit = LIMIT.load(Ordering::Relaxed);
+    if should_trip(balance, limit) {
+        // panic_any boxes the payload, which re-enters this allocator and 
calls
+        // track() again. Set UNWINDING first so that re-entrant call 
short-circuits
+        // above, preventing infinite recursion / a double panic from inside 
alloc.
+        UNWINDING.with(|u| u.set(true));
+        std::panic::panic_any(OomGuardPanic {
+            balance: balance.max(0) as usize,
+            limit,
+        });
+    }
+}
+
+/// Pure helper: given the current shared balance and a limit, decide whether 
an
+/// armed+stamped thread should trip the breaker. `limit == 0` means "unset".
+fn should_trip(balance: isize, limit: usize) -> bool {
+    limit != 0 && balance > limit.try_into().unwrap_or(isize::MAX)
+}
+
+/// Pure helper: add `delta` to `local_drift`; if it reaches or exceeds 
`SETTLE_THRESHOLD`
+/// in magnitude, flush it into `shared` and return the new shared balance.
+/// Otherwise return `None` (nothing flushed).
+fn settle(local_drift: &mut isize, delta: isize, shared: &AtomicIsize) -> 
Option<isize> {
+    *local_drift = local_drift.wrapping_add(delta);
+    if local_drift.unsigned_abs() >= SETTLE_THRESHOLD as usize {
+        let flushed = *local_drift;
+        *local_drift = 0;
+        let prev = shared.fetch_add(flushed, Ordering::Relaxed);
+        Some(prev.wrapping_add(flushed))
+    } else {
+        None
+    }
+}
+
+/// Wraps an inner global allocator, tracking layout bytes for the OomGuard.
+pub struct AccountingAllocator<A: GlobalAlloc> {
+    inner: A,
+}
+
+impl<A: GlobalAlloc> AccountingAllocator<A> {
+    pub const fn new(inner: A) -> Self {
+        Self { inner }
+    }
+}
+
+unsafe impl<A: GlobalAlloc> GlobalAlloc for AccountingAllocator<A> {
+    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
+        let ptr = self.inner.alloc(layout);
+        if !ptr.is_null() {
+            record_alloc(layout.size());
+        }
+        ptr
+    }
+
+    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
+        self.inner.dealloc(ptr, layout);
+        record_dealloc(layout.size());
+    }
+
+    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
+        let ptr = self.inner.alloc_zeroed(layout);
+        if !ptr.is_null() {
+            record_alloc(layout.size());
+        }
+        ptr
+    }
+
+    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> 
*mut u8 {
+        let new_ptr = self.inner.realloc(ptr, layout, new_size);
+        if !new_ptr.is_null() {
+            // Casts and subtraction are safe in practice: a single allocation 
cannot
+            // exceed isize::MAX on any real platform, so no wrapping or 
overflow occurs.
+            let old = layout.size() as isize;
+            let new = new_size as isize;
+            track(new - old);

Review Comment:
   Just fixed this bug in DF. You need to panic before the realloc, otherwise 
the caller still has the old pointer and tries to free it on unwind and 
segfaults.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to