This is an automated email from the ASF dual-hosted git repository.
wayne pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 55865d3dd3 memory tracking with memory pool (#7303)
55865d3dd3 is described below
commit 55865d3dd3bb7dc9d744d771ac10a0643d95eebc
Author: Ruihang Xia <[email protected]>
AuthorDate: Thu Jul 10 10:00:48 2025 -0700
memory tracking with memory pool (#7303)
---
arrow-buffer/Cargo.toml | 3 +
arrow-buffer/src/buffer/immutable.rs | 14 +++
arrow-buffer/src/buffer/mutable.rs | 175 +++++++++++++++++++++++++++++++-
arrow-buffer/src/bytes.rs | 122 ++++++++++++++++++++--
arrow-buffer/src/lib.rs | 5 +
arrow-buffer/src/pool.rs | 189 +++++++++++++++++++++++++++++++++++
6 files changed, 499 insertions(+), 9 deletions(-)
diff --git a/arrow-buffer/Cargo.toml b/arrow-buffer/Cargo.toml
index d4fa0614e0..21ed4212da 100644
--- a/arrow-buffer/Cargo.toml
+++ b/arrow-buffer/Cargo.toml
@@ -35,6 +35,9 @@ bench = false
[package.metadata.docs.rs]
all-features = true
+[features]
+pool = []
+
[dependencies]
bytes = { version = "1.4" }
num = { version = "0.4", default-features = false, features = ["std"] }
diff --git a/arrow-buffer/src/buffer/immutable.rs
b/arrow-buffer/src/buffer/immutable.rs
index 946299d006..aedfe97468 100644
--- a/arrow-buffer/src/buffer/immutable.rs
+++ b/arrow-buffer/src/buffer/immutable.rs
@@ -25,6 +25,9 @@ use crate::util::bit_chunk_iterator::{BitChunks,
UnalignedBitChunk};
use crate::BufferBuilder;
use crate::{bit_util, bytes::Bytes, native::ArrowNativeType};
+#[cfg(feature = "pool")]
+use crate::pool::MemoryPool;
+
use super::ops::bitwise_unary_op_helper;
use super::{MutableBuffer, ScalarBuffer};
@@ -430,6 +433,17 @@ impl Buffer {
pub fn ptr_eq(&self, other: &Self) -> bool {
self.ptr == other.ptr && self.length == other.length
}
+
+ /// Register this [`Buffer`] with the provided [`MemoryPool`]
+ ///
+ /// This claims the memory used by this buffer in the pool, allowing for
+ /// accurate accounting of memory usage. Any prior reservation will be
+ /// released so this works well when the buffer is being shared among
+ /// multiple arrays.
+ #[cfg(feature = "pool")]
+ pub fn claim(&self, pool: &dyn MemoryPool) {
+ self.data.claim(pool)
+ }
}
/// Note that here we deliberately do not implement
diff --git a/arrow-buffer/src/buffer/mutable.rs
b/arrow-buffer/src/buffer/mutable.rs
index 19ca0fef15..63fdbf598b 100644
--- a/arrow-buffer/src/buffer/mutable.rs
+++ b/arrow-buffer/src/buffer/mutable.rs
@@ -26,6 +26,11 @@ use crate::{
util::bit_util,
};
+#[cfg(feature = "pool")]
+use crate::pool::{MemoryPool, MemoryReservation};
+#[cfg(feature = "pool")]
+use std::sync::Mutex;
+
use super::Buffer;
/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of
items or slices of items.
@@ -57,6 +62,10 @@ pub struct MutableBuffer {
// invariant: len <= capacity
len: usize,
layout: Layout,
+
+ /// Memory reservation for tracking memory usage
+ #[cfg(feature = "pool")]
+ reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
}
impl MutableBuffer {
@@ -91,6 +100,8 @@ impl MutableBuffer {
data,
len: 0,
layout,
+ #[cfg(feature = "pool")]
+ reservation: std::sync::Mutex::new(None),
}
}
@@ -115,7 +126,13 @@ impl MutableBuffer {
NonNull::new(raw_ptr).unwrap_or_else(||
handle_alloc_error(layout))
}
};
- Self { data, len, layout }
+ Self {
+ data,
+ len,
+ layout,
+ #[cfg(feature = "pool")]
+ reservation: std::sync::Mutex::new(None),
+ }
}
/// Allocates a new [MutableBuffer] from given `Bytes`.
@@ -127,9 +144,17 @@ impl MutableBuffer {
let len = bytes.len();
let data = bytes.ptr();
+ #[cfg(feature = "pool")]
+ let reservation = bytes.reservation.lock().unwrap().take();
mem::forget(bytes);
- Ok(Self { data, len, layout })
+ Ok(Self {
+ data,
+ len,
+ layout,
+ #[cfg(feature = "pool")]
+ reservation: Mutex::new(reservation),
+ })
}
/// creates a new [MutableBuffer] with capacity and length capable of
holding `len` bits.
@@ -217,6 +242,12 @@ impl MutableBuffer {
};
self.data = NonNull::new(data).unwrap_or_else(||
handle_alloc_error(new_layout));
self.layout = new_layout;
+ #[cfg(feature = "pool")]
+ {
+ if let Some(reservation) =
self.reservation.lock().unwrap().as_mut() {
+ reservation.resize(self.layout.size());
+ }
+ }
}
/// Truncates this buffer to `len` bytes
@@ -228,6 +259,12 @@ impl MutableBuffer {
return;
}
self.len = len;
+ #[cfg(feature = "pool")]
+ {
+ if let Some(reservation) =
self.reservation.lock().unwrap().as_mut() {
+ reservation.resize(self.len);
+ }
+ }
}
/// Resizes the buffer, either truncating its contents (with no change in
capacity), or
@@ -251,6 +288,12 @@ impl MutableBuffer {
}
// this truncates the buffer when new_len < self.len
self.len = new_len;
+ #[cfg(feature = "pool")]
+ {
+ if let Some(reservation) =
self.reservation.lock().unwrap().as_mut() {
+ reservation.resize(self.len);
+ }
+ }
}
/// Shrinks the capacity of the buffer as much as possible.
@@ -328,6 +371,11 @@ impl MutableBuffer {
#[inline]
pub(super) fn into_buffer(self) -> Buffer {
let bytes = unsafe { Bytes::new(self.data, self.len,
Deallocation::Standard(self.layout)) };
+ #[cfg(feature = "pool")]
+ {
+ let reservation = self.reservation.lock().unwrap().take();
+ *bytes.reservation.lock().unwrap() = reservation;
+ }
std::mem::forget(self);
Buffer::from(bytes)
}
@@ -466,6 +514,17 @@ impl MutableBuffer {
buffer.truncate(bit_util::ceil(len, 8));
buffer
}
+
+ /// Register this [`MutableBuffer`] with the provided [`MemoryPool`]
+ ///
+ /// This claims the memory used by this buffer in the pool, allowing for
+ /// accurate accounting of memory usage. Any prior reservation will be
+ /// released so this works well when the buffer is being shared among
+ /// multiple arrays.
+ #[cfg(feature = "pool")]
+ pub fn claim(&self, pool: &dyn MemoryPool) {
+ *self.reservation.lock().unwrap() =
Some(pool.reserve(self.capacity()));
+ }
}
/// Creates a non-null pointer with alignment of [`ALIGNMENT`]
@@ -506,7 +565,13 @@ impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
// This is based on `RawVec::current_memory`
let layout = unsafe {
Layout::array::<T>(value.capacity()).unwrap_unchecked() };
mem::forget(value);
- Self { data, len, layout }
+ Self {
+ data,
+ len,
+ layout,
+ #[cfg(feature = "pool")]
+ reservation: std::sync::Mutex::new(None),
+ }
}
}
@@ -1013,4 +1078,108 @@ mod tests {
let max_capacity = isize::MAX as usize - (isize::MAX as usize %
ALIGNMENT);
let _ = MutableBuffer::with_capacity(max_capacity + 1);
}
+
+ #[cfg(feature = "pool")]
+ mod pool_tests {
+ use super::*;
+ use crate::pool::{MemoryPool, TrackingMemoryPool};
+
+ #[test]
+ fn test_reallocate_with_pool() {
+ let pool = TrackingMemoryPool::default();
+ let mut buffer = MutableBuffer::with_capacity(100);
+ buffer.claim(&pool);
+
+ // Initial capacity should be 128 (multiple of 64)
+ assert_eq!(buffer.capacity(), 128);
+ assert_eq!(pool.used(), 128);
+
+ // Reallocate to a larger size
+ buffer.reallocate(200);
+
+ // The capacity is exactly the requested size, not rounded up
+ assert_eq!(buffer.capacity(), 200);
+ assert_eq!(pool.used(), 200);
+
+ // Reallocate to a smaller size
+ buffer.reallocate(50);
+
+ // The capacity is exactly the requested size, not rounded up
+ assert_eq!(buffer.capacity(), 50);
+ assert_eq!(pool.used(), 50);
+ }
+
+ #[test]
+ fn test_truncate_with_pool() {
+ let pool = TrackingMemoryPool::default();
+ let mut buffer = MutableBuffer::with_capacity(100);
+
+ // Fill buffer with some data
+ buffer.resize(80, 1);
+ assert_eq!(buffer.len(), 80);
+
+ buffer.claim(&pool);
+ assert_eq!(pool.used(), 128);
+
+ // Truncate buffer
+ buffer.truncate(40);
+ assert_eq!(buffer.len(), 40);
+ assert_eq!(pool.used(), 40);
+
+ // Truncate to zero
+ buffer.truncate(0);
+ assert_eq!(buffer.len(), 0);
+ assert_eq!(pool.used(), 0);
+ }
+
+ #[test]
+ fn test_resize_with_pool() {
+ let pool = TrackingMemoryPool::default();
+ let mut buffer = MutableBuffer::with_capacity(100);
+ buffer.claim(&pool);
+
+ // Initial state
+ assert_eq!(buffer.len(), 0);
+ assert_eq!(pool.used(), 128);
+
+ // Resize to increase length
+ buffer.resize(50, 1);
+ assert_eq!(buffer.len(), 50);
+ assert_eq!(pool.used(), 50);
+
+ // Resize to increase length beyond capacity
+ buffer.resize(150, 1);
+ assert_eq!(buffer.len(), 150);
+ assert_eq!(buffer.capacity(), 256);
+ assert_eq!(pool.used(), 150);
+
+ // Resize to decrease length
+ buffer.resize(30, 1);
+ assert_eq!(buffer.len(), 30);
+ assert_eq!(pool.used(), 30);
+ }
+
+ #[test]
+ fn test_buffer_lifecycle_with_pool() {
+ let pool = TrackingMemoryPool::default();
+
+ // Create a buffer with memory reservation
+ let mut mutable = MutableBuffer::with_capacity(100);
+ mutable.resize(80, 1);
+ mutable.claim(&pool);
+
+ // Memory reservation is based on capacity when using claim()
+ assert_eq!(pool.used(), 128);
+
+ // Convert to immutable Buffer
+ let buffer = mutable.into_buffer();
+
+ // Memory reservation should be preserved
+ assert_eq!(pool.used(), 128);
+
+ // Drop the buffer and the reservation should be released
+ drop(buffer);
+ assert_eq!(pool.used(), 0);
+ }
+ }
}
diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs
index b811bd2c6b..8f912b807d 100644
--- a/arrow-buffer/src/bytes.rs
+++ b/arrow-buffer/src/bytes.rs
@@ -26,6 +26,11 @@ use std::{fmt::Debug, fmt::Formatter};
use crate::alloc::Deallocation;
use crate::buffer::dangling_ptr;
+#[cfg(feature = "pool")]
+use crate::pool::{MemoryPool, MemoryReservation};
+#[cfg(feature = "pool")]
+use std::sync::Mutex;
+
/// A continuous, fixed-size, immutable memory region that knows how to
de-allocate itself.
///
/// Note that this structure is an internal implementation detail of the
@@ -49,6 +54,10 @@ pub struct Bytes {
/// how to deallocate this region
deallocation: Deallocation,
+
+ /// Memory reservation for tracking memory usage
+ #[cfg(feature = "pool")]
+ pub(super) reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
}
impl Bytes {
@@ -70,6 +79,8 @@ impl Bytes {
ptr,
len,
deallocation,
+ #[cfg(feature = "pool")]
+ reservation: Mutex::new(None),
}
}
@@ -101,6 +112,27 @@ impl Bytes {
}
}
+ /// Register this [`Bytes`] with the provided [`MemoryPool`], replacing
any prior reservation.
+ #[cfg(feature = "pool")]
+ pub fn claim(&self, pool: &dyn MemoryPool) {
+ *self.reservation.lock().unwrap() =
Some(pool.reserve(self.capacity()));
+ }
+
+ /// Resize the memory reservation of this buffer
+ ///
+ /// This is a no-op if this buffer doesn't have a reservation.
+ #[cfg(feature = "pool")]
+ fn resize_reservation(&self, new_size: usize) {
+ let mut guard = self.reservation.lock().unwrap();
+ if let Some(mut reservation) = guard.take() {
+ // Resize the reservation
+ reservation.resize(new_size);
+
+ // Put it back
+ *guard = Some(reservation);
+ }
+ }
+
/// Try to reallocate the underlying memory region to a new size (smaller
or larger).
///
/// Only works for bytes allocated with the standard allocator.
@@ -135,6 +167,13 @@ impl Bytes {
self.ptr = ptr;
self.len = new_len;
self.deallocation = Deallocation::Standard(new_layout);
+
+ #[cfg(feature = "pool")]
+ {
+ // Resize reservation
+ self.resize_reservation(new_len);
+ }
+
return Ok(());
}
}
@@ -199,6 +238,8 @@ impl From<bytes::Bytes> for Bytes {
len,
ptr: NonNull::new(value.as_ptr() as _).unwrap(),
deallocation: Deallocation::Custom(std::sync::Arc::new(value),
len),
+ #[cfg(feature = "pool")]
+ reservation: Mutex::new(None),
}
}
}
@@ -209,14 +250,83 @@ mod tests {
#[test]
fn test_from_bytes() {
- let bytes = bytes::Bytes::from(vec![1, 2, 3, 4]);
- let arrow_bytes: Bytes = bytes.clone().into();
+ let message = b"hello arrow";
- assert_eq!(bytes.as_ptr(), arrow_bytes.as_ptr());
+ // we can create a Bytes from bytes::Bytes (created from slices)
+ let c_bytes: bytes::Bytes = message.as_ref().into();
+ let a_bytes: Bytes = c_bytes.into();
+ assert_eq!(a_bytes.as_slice(), message);
- drop(bytes);
- drop(arrow_bytes);
+ // we can create a Bytes from bytes::Bytes (created from Vec)
+ let c_bytes: bytes::Bytes = bytes::Bytes::from(message.to_vec());
+ let a_bytes: Bytes = c_bytes.into();
+ assert_eq!(a_bytes.as_slice(), message);
+ }
+
+ #[cfg(feature = "pool")]
+ mod pool_tests {
+ use super::*;
+
+ use crate::pool::TrackingMemoryPool;
+
+ #[test]
+ fn test_bytes_with_pool() {
+ // Create a standard allocation
+ let buffer = unsafe {
+ let layout =
+ std::alloc::Layout::from_size_align(1024,
crate::alloc::ALIGNMENT).unwrap();
+ let ptr = std::alloc::alloc(layout);
+ assert!(!ptr.is_null());
+
+ Bytes::new(
+ NonNull::new(ptr).unwrap(),
+ 1024,
+ Deallocation::Standard(layout),
+ )
+ };
+
+ // Create a memory pool
+ let pool = TrackingMemoryPool::default();
+ assert_eq!(pool.used(), 0);
+
+ // Reserve memory and assign to buffer. Claim twice.
+ buffer.claim(&pool);
+ assert_eq!(pool.used(), 1024);
+ buffer.claim(&pool);
+ assert_eq!(pool.used(), 1024);
+
+ // Memory should be released when buffer is dropped
+ drop(buffer);
+ assert_eq!(pool.used(), 0);
+ }
+
+ #[test]
+ fn test_bytes_drop_releases_pool() {
+ let pool = TrackingMemoryPool::default();
+
+ {
+ // Create a buffer with pool
+ let _buffer = unsafe {
+ let layout =
+ std::alloc::Layout::from_size_align(1024,
crate::alloc::ALIGNMENT).unwrap();
+ let ptr = std::alloc::alloc(layout);
+ assert!(!ptr.is_null());
+
+ let bytes = Bytes::new(
+ NonNull::new(ptr).unwrap(),
+ 1024,
+ Deallocation::Standard(layout),
+ );
+
+ bytes.claim(&pool);
+ bytes
+ };
- let _ = Bytes::from(bytes::Bytes::new());
+ assert_eq!(pool.used(), 1024);
+ }
+
+ // Buffer has been dropped, memory should be released
+ assert_eq!(pool.used(), 0);
+ }
}
}
diff --git a/arrow-buffer/src/lib.rs b/arrow-buffer/src/lib.rs
index 174cdc4d9c..1090146f36 100644
--- a/arrow-buffer/src/lib.rs
+++ b/arrow-buffer/src/lib.rs
@@ -48,3 +48,8 @@ mod interval;
pub use interval::*;
mod arith;
+
+#[cfg(feature = "pool")]
+mod pool;
+#[cfg(feature = "pool")]
+pub use pool::*;
diff --git a/arrow-buffer/src/pool.rs b/arrow-buffer/src/pool.rs
new file mode 100644
index 0000000000..bf22d433d6
--- /dev/null
+++ b/arrow-buffer/src/pool.rs
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains traits for memory pool traits and an implementation
+//! for tracking memory usage.
+//!
+//! The basic traits are [`MemoryPool`] and [`MemoryReservation`]. And default
+//! implementation of [`MemoryPool`] is [`TrackingMemoryPool`]. Their
relationship
+//! is as follows:
+//!
+//! ```text
+//! (pool tracker) (resizable)
+//! ┌──────────────────┐ fn reserve() ┌─────────────────────────┐
+//! │ trait MemoryPool │─────────────►│ trait MemoryReservation │
+//! └──────────────────┘ └─────────────────────────┘
+//! ```
+
+use std::fmt::Debug;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+
+/// A memory reservation within a [`MemoryPool`] that is freed on drop
+pub trait MemoryReservation: Debug + Send + Sync {
+ /// Returns the size of this reservation in bytes.
+ fn size(&self) -> usize;
+
+ /// Resize this reservation to a new size in bytes.
+ fn resize(&mut self, new_size: usize);
+}
+
+/// A pool of memory that can be reserved and released.
+///
+/// This is used to accurately track memory usage when buffers are shared
+/// between multiple arrays or other data structures.
+///
+/// For example, assume we have two arrays that share underlying buffer.
+/// It's hard to tell how much memory is used by them because we can't
+/// tell if the buffer is shared or not.
+///
+/// ```text
+/// Array A Array B
+/// ┌────────────┐ ┌────────────┐
+/// │ slices... │ │ slices... │
+/// │────────────│ │────────────│
+/// │ Arc<Bytes> │ │ Arc<Bytes> │ (shared buffer)
+/// └─────▲──────┘ └───────▲────┘
+/// │ │
+/// │ Bytes │
+/// │ ┌─────────────┐ │
+/// │ │ data... │ │
+/// │ │─────────────│ │
+/// └──│ Memory │──┘ (tracked with a memory pool)
+/// │ Reservation │
+/// └─────────────┘
+/// ```
+///
+/// With a memory pool, we can count the memory usage by the shared buffer
+/// directly.
+pub trait MemoryPool: Debug + Send + Sync {
+ /// Reserves memory from the pool. Infallible.
+ ///
+ /// Returns a reservation of the requested size.
+ fn reserve(&self, size: usize) -> Box<dyn MemoryReservation>;
+
+ /// Returns the current available memory in the pool.
+ ///
+ /// The pool may be overfilled, so this method might return a negative
value.
+ fn available(&self) -> isize;
+
+ /// Returns the current used memory from the pool.
+ fn used(&self) -> usize;
+
+ /// Returns the maximum memory that can be reserved from the pool.
+ fn capacity(&self) -> usize;
+}
+
+/// A simple [`MemoryPool`] that reports the total memory usage
+#[derive(Debug, Default)]
+pub struct TrackingMemoryPool(Arc<AtomicUsize>);
+
+impl TrackingMemoryPool {
+ /// Returns the total allocated size
+ pub fn allocated(&self) -> usize {
+ self.0.load(Ordering::Relaxed)
+ }
+}
+
+impl MemoryPool for TrackingMemoryPool {
+ fn reserve(&self, size: usize) -> Box<dyn MemoryReservation> {
+ self.0.fetch_add(size, Ordering::Relaxed);
+ Box::new(Tracker {
+ size,
+ shared: Arc::clone(&self.0),
+ })
+ }
+
+ fn available(&self) -> isize {
+ isize::MAX - self.used() as isize
+ }
+
+ fn used(&self) -> usize {
+ self.0.load(Ordering::Relaxed)
+ }
+
+ fn capacity(&self) -> usize {
+ usize::MAX
+ }
+}
+
+#[derive(Debug)]
+struct Tracker {
+ size: usize,
+ shared: Arc<AtomicUsize>,
+}
+
+impl Drop for Tracker {
+ fn drop(&mut self) {
+ self.shared.fetch_sub(self.size, Ordering::Relaxed);
+ }
+}
+
+impl MemoryReservation for Tracker {
+ fn size(&self) -> usize {
+ self.size
+ }
+
+ fn resize(&mut self, new: usize) {
+ match self.size < new {
+ true => self.shared.fetch_add(new - self.size, Ordering::Relaxed),
+ false => self.shared.fetch_sub(self.size - new, Ordering::Relaxed),
+ };
+ self.size = new;
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_tracking_memory_pool() {
+ let pool = TrackingMemoryPool::default();
+
+ // Reserve 512 bytes
+ let reservation = pool.reserve(512);
+ assert_eq!(reservation.size(), 512);
+ assert_eq!(pool.used(), 512);
+ assert_eq!(pool.available(), isize::MAX - 512);
+
+ // Reserve another 256 bytes
+ let reservation2 = pool.reserve(256);
+ assert_eq!(reservation2.size(), 256);
+ assert_eq!(pool.used(), 768);
+ assert_eq!(pool.available(), isize::MAX - 768);
+
+ // Test resize to increase
+ let mut reservation_mut = reservation;
+ reservation_mut.resize(600);
+ assert_eq!(reservation_mut.size(), 600);
+ assert_eq!(pool.used(), 856); // 600 + 256
+
+ // Test resize to decrease
+ reservation_mut.resize(400);
+ assert_eq!(reservation_mut.size(), 400);
+ assert_eq!(pool.used(), 656); // 400 + 256
+
+ // Drop the first reservation
+ drop(reservation_mut);
+ assert_eq!(pool.used(), 256);
+
+ // Drop the second reservation
+ drop(reservation2);
+ assert_eq!(pool.used(), 0);
+ }
+}