This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c3d4867e4 GH-48105: [C++][Parquet][IPC] Cap allocated memory when 
fuzzing (#48108)
7c3d4867e4 is described below

commit 7c3d4867e40dd0100542247a61cb83520369b2d4
Author: Antoine Pitrou <[email protected]>
AuthorDate: Sat Nov 15 11:41:07 2025 +0100

    GH-48105: [C++][Parquet][IPC] Cap allocated memory when fuzzing (#48108)
    
    ### Rationale for this change
    
    OSS-Fuzz will trigger an out-of-memory crash if the allocated memory goes 
beyond a predefined limit (usually 2560 MB, though that can be configured). For 
Parquet and IPC, it is legitimate to allocate a lot of memory when 
decompressing data, though, so that can happen on both valid and invalid input 
files.
    
    Unfortunately, OSS-Fuzz checks for this memory limit not by instrumenting 
malloc and having it return NULL when the limit is reached, but by checking 
allocated memory periodically from a separate thread. This can be solved by 
implementing our custom allocator with an upper limit, exactly how the mupdf 
project did in https://github.com/google/oss-fuzz/issues/1830
    
    ### What changes are included in this PR?
    
    1. Implement a `CappedMemoryPool`
    2. Use the `CappedMemoryPool` with a hardcoded limit in the Parquet and IPC 
fuzz targets
    
    ### Are these changes tested?
    
    Yes, by additional unit tests.
    
    ### Are there any user-facing changes?
    
    No.
    
    * GitHub Issue: #48105
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/CMakeLists.txt                       |  1 +
 cpp/src/arrow/ipc/file_fuzz.cc                     |  3 +-
 cpp/src/arrow/ipc/reader.cc                        | 13 +++-
 cpp/src/arrow/ipc/stream_fuzz.cc                   |  3 +-
 cpp/src/arrow/memory_pool.h                        | 68 ++++++++++++++++
 cpp/src/arrow/memory_pool_test.cc                  | 90 ++++++++++++++++++++++
 .../{ipc/file_fuzz.cc => util/fuzz_internal.cc}    | 26 +++++--
 .../{ipc/file_fuzz.cc => util/fuzz_internal.h}     | 25 ++++--
 cpp/src/parquet/arrow/fuzz.cc                      |  3 +-
 cpp/src/parquet/arrow/reader.cc                    |  8 +-
 10 files changed, 217 insertions(+), 23 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 2e5c67e07b..ec8b6c1b32 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -511,6 +511,7 @@ set(ARROW_UTIL_SRCS
     util/float16.cc
     util/formatting.cc
     util/future.cc
+    util/fuzz_internal.cc
     util/hashing.cc
     util/int_util.cc
     util/io_util.cc
diff --git a/cpp/src/arrow/ipc/file_fuzz.cc b/cpp/src/arrow/ipc/file_fuzz.cc
index 840d19a4ef..c3ce9c2dea 100644
--- a/cpp/src/arrow/ipc/file_fuzz.cc
+++ b/cpp/src/arrow/ipc/file_fuzz.cc
@@ -19,10 +19,11 @@
 
 #include "arrow/ipc/reader.h"
 #include "arrow/status.h"
+#include "arrow/util/fuzz_internal.h"
 #include "arrow/util/macros.h"
 
 extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
   auto status = arrow::ipc::internal::FuzzIpcFile(data, 
static_cast<int64_t>(size));
-  ARROW_UNUSED(status);
+  arrow::internal::LogFuzzStatus(status, data, static_cast<int64_t>(size));
   return 0;
 }
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 1ec2836626..4910b1596c 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -53,6 +53,7 @@
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/compression.h"
 #include "arrow/util/endian.h"
+#include "arrow/util/fuzz_internal.h"
 #include "arrow/util/key_value_metadata.h"
 #include "arrow/util/logging_internal.h"
 #include "arrow/util/parallel.h"
@@ -2618,6 +2619,12 @@ Status ValidateFuzzBatch(const RecordBatch& batch) {
   return st;
 }
 
+IpcReadOptions FuzzingOptions() {
+  IpcReadOptions options;
+  options.memory_pool = ::arrow::internal::fuzzing_memory_pool();
+  return options;
+}
+
 }  // namespace
 
 Status FuzzIpcStream(const uint8_t* data, int64_t size) {
@@ -2625,7 +2632,8 @@ Status FuzzIpcStream(const uint8_t* data, int64_t size) {
   io::BufferReader buffer_reader(buffer);
 
   std::shared_ptr<RecordBatchReader> batch_reader;
-  ARROW_ASSIGN_OR_RAISE(batch_reader, 
RecordBatchStreamReader::Open(&buffer_reader));
+  ARROW_ASSIGN_OR_RAISE(batch_reader,
+                        RecordBatchStreamReader::Open(&buffer_reader, 
FuzzingOptions()));
   Status st;
 
   while (true) {
@@ -2645,7 +2653,8 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
   io::BufferReader buffer_reader(buffer);
 
   std::shared_ptr<RecordBatchFileReader> batch_reader;
-  ARROW_ASSIGN_OR_RAISE(batch_reader, 
RecordBatchFileReader::Open(&buffer_reader));
+  ARROW_ASSIGN_OR_RAISE(batch_reader,
+                        RecordBatchFileReader::Open(&buffer_reader, 
FuzzingOptions()));
   Status st;
 
   const int n_batches = batch_reader->num_record_batches();
diff --git a/cpp/src/arrow/ipc/stream_fuzz.cc b/cpp/src/arrow/ipc/stream_fuzz.cc
index e26f3d1f4e..04b12863be 100644
--- a/cpp/src/arrow/ipc/stream_fuzz.cc
+++ b/cpp/src/arrow/ipc/stream_fuzz.cc
@@ -19,10 +19,11 @@
 
 #include "arrow/ipc/reader.h"
 #include "arrow/status.h"
+#include "arrow/util/fuzz_internal.h"
 #include "arrow/util/macros.h"
 
 extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
   auto status = arrow::ipc::internal::FuzzIpcStream(data, 
static_cast<int64_t>(size));
-  ARROW_UNUSED(status);
+  arrow::internal::LogFuzzStatus(status, data, static_cast<int64_t>(size));
   return 0;
 }
diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h
index 19a938c336..a8f964200e 100644
--- a/cpp/src/arrow/memory_pool.h
+++ b/cpp/src/arrow/memory_pool.h
@@ -26,6 +26,7 @@
 #include "arrow/result.h"
 #include "arrow/status.h"
 #include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
@@ -245,6 +246,73 @@ class ARROW_EXPORT ProxyMemoryPool : public MemoryPool {
   std::unique_ptr<ProxyMemoryPoolImpl> impl_;
 };
 
+/// EXPERIMENTAL MemoryPool wrapper with an upper limit
+///
+/// Checking for limits is not done in a fully thread-safe way, therefore
+/// multi-threaded allocations might be able to go successfully above the
+/// configured limit.
+class ARROW_EXPORT CappedMemoryPool : public MemoryPool {
+ public:
+  CappedMemoryPool(MemoryPool* wrapped_pool, int64_t bytes_allocated_limit)
+      : wrapped_(wrapped_pool), bytes_allocated_limit_(bytes_allocated_limit) 
{}
+
+  using MemoryPool::Allocate;
+  using MemoryPool::Reallocate;
+
+  Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override {
+    // XXX Another thread may allocate memory between the limit check and
+    // the `Allocate` call. It is possible for the two allocations to be 
successful
+    // while going above the limit.
+    // Solving this issue would require refactoring the `MemoryPool` 
implementation
+    // to delegate the limit check to `MemoryPoolStats`.
+    const auto attempted = size + wrapped_->bytes_allocated();
+    if (ARROW_PREDICT_FALSE(attempted > bytes_allocated_limit_)) {
+      return OutOfMemory(attempted);
+    }
+    return wrapped_->Allocate(size, alignment, out);
+  }
+
+  Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
+                    uint8_t** ptr) override {
+    const auto attempted = new_size - old_size + wrapped_->bytes_allocated();
+    if (ARROW_PREDICT_FALSE(attempted > bytes_allocated_limit_)) {
+      return OutOfMemory(attempted);
+    }
+    return wrapped_->Reallocate(old_size, new_size, alignment, ptr);
+  }
+
+  void Free(uint8_t* buffer, int64_t size, int64_t alignment) override {
+    return wrapped_->Free(buffer, size, alignment);
+  }
+
+  void ReleaseUnused() override { wrapped_->ReleaseUnused(); }
+
+  void PrintStats() override { wrapped_->PrintStats(); }
+
+  int64_t bytes_allocated() const override { return 
wrapped_->bytes_allocated(); }
+
+  int64_t max_memory() const override { return wrapped_->max_memory(); }
+
+  int64_t total_bytes_allocated() const override {
+    return wrapped_->total_bytes_allocated();
+  }
+
+  int64_t num_allocations() const override { return 
wrapped_->num_allocations(); }
+
+  std::string backend_name() const override { return wrapped_->backend_name(); 
}
+
+ private:
+  Status OutOfMemory(int64_t value) {
+    return Status::OutOfMemory(
+        "MemoryPool bytes_allocated cap exceeded: "
+        "limit=",
+        bytes_allocated_limit_, ", attempted=", value);
+  }
+
+  MemoryPool* wrapped_;
+  const int64_t bytes_allocated_limit_;
+};
+
 /// \brief Return a process-wide memory pool based on the system allocator.
 ARROW_EXPORT MemoryPool* system_memory_pool();
 
diff --git a/cpp/src/arrow/memory_pool_test.cc 
b/cpp/src/arrow/memory_pool_test.cc
index 3f0a852876..20006ebeb4 100644
--- a/cpp/src/arrow/memory_pool_test.cc
+++ b/cpp/src/arrow/memory_pool_test.cc
@@ -17,6 +17,7 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <memory>
 
 #include <gtest/gtest.h>
 
@@ -290,4 +291,93 @@ TEST(Jemalloc, GetAllocationStats) {
 #endif
 }
 
+class TestCappedMemoryPool : public ::arrow::TestMemoryPoolBase {
+ public:
+  MemoryPool* memory_pool() override { return 
InitPool(/*limit=*/1'000'000'000LL); }
+
+  MemoryPool* InitPool(int64_t limit) {
+    proxy_memory_pool_ = 
std::make_shared<ProxyMemoryPool>(default_memory_pool());
+    capped_memory_pool_ =
+        std::make_shared<CappedMemoryPool>(proxy_memory_pool_.get(), limit);
+    return capped_memory_pool_.get();
+  }
+
+ protected:
+  std::shared_ptr<MemoryPool> proxy_memory_pool_;
+  std::shared_ptr<CappedMemoryPool> capped_memory_pool_;
+};
+
+TEST_F(TestCappedMemoryPool, MemoryTracking) { this->TestMemoryTracking(); }
+
+TEST_F(TestCappedMemoryPool, OOM) {
+  // CappedMemoryPool rejects the huge allocation without hitting the 
underlying
+  // allocator, so this should work even under Address Sanitizer.
+  this->TestOOM();
+}
+
+TEST_F(TestCappedMemoryPool, Reallocate) { this->TestReallocate(); }
+
+TEST_F(TestCappedMemoryPool, Alignment) { this->TestAlignment(); }
+
+TEST_F(TestCappedMemoryPool, AllocateLimit) {
+  auto pool = InitPool(/*limit=*/1000);
+
+  uint8_t* data1;
+  uint8_t* data2;
+  ASSERT_OK(pool->Allocate(600, &data1));
+  ASSERT_EQ(600, pool->bytes_allocated());
+  ASSERT_EQ(600, pool->total_bytes_allocated());
+  ASSERT_EQ(600, pool->max_memory());
+
+  ASSERT_OK(pool->Allocate(400, &data2));
+  ASSERT_EQ(1000, pool->bytes_allocated());
+  ASSERT_EQ(1000, pool->total_bytes_allocated());
+  ASSERT_EQ(1000, pool->max_memory());
+  pool->Free(data2, 400);
+  ASSERT_EQ(600, pool->bytes_allocated());
+  ASSERT_EQ(1000, pool->total_bytes_allocated());
+  ASSERT_EQ(1000, pool->max_memory());
+
+  ASSERT_OK(pool->Allocate(300, &data2));
+  ASSERT_EQ(900, pool->bytes_allocated());
+  ASSERT_EQ(1300, pool->total_bytes_allocated());
+  ASSERT_EQ(1000, pool->max_memory());
+  pool->Free(data2, 300);
+  ASSERT_EQ(600, pool->bytes_allocated());
+  ASSERT_EQ(1300, pool->total_bytes_allocated());
+  ASSERT_EQ(1000, pool->max_memory());
+
+  ASSERT_RAISES(OutOfMemory, pool->Allocate(401, &data2));
+  ASSERT_EQ(600, pool->bytes_allocated());
+  ASSERT_EQ(1300, pool->total_bytes_allocated());
+  ASSERT_EQ(1000, pool->max_memory());
+
+  pool->Free(data1, 600);
+}
+
+TEST_F(TestCappedMemoryPool, ReallocateLimit) {
+  auto pool = InitPool(/*limit=*/1000);
+
+  uint8_t* data1;
+  uint8_t* data2;
+  ASSERT_OK(pool->Allocate(600, &data1));
+  ASSERT_OK(pool->Allocate(400, &data2));
+  ASSERT_EQ(1000, pool->bytes_allocated());
+  ASSERT_EQ(1000, pool->total_bytes_allocated());
+  ASSERT_EQ(1000, pool->max_memory());
+
+  ASSERT_OK(pool->Reallocate(400, 300, &data2));
+  ASSERT_EQ(900, pool->bytes_allocated());
+  ASSERT_EQ(1000, pool->total_bytes_allocated());
+  ASSERT_EQ(1000, pool->max_memory());
+
+  ASSERT_RAISES(OutOfMemory, pool->Reallocate(300, 401, &data2));
+  ASSERT_EQ(900, pool->bytes_allocated());
+  ASSERT_EQ(1000, pool->total_bytes_allocated());
+  ASSERT_EQ(1000, pool->max_memory());
+
+  pool->Free(data1, 600);
+  pool->Free(data2, 300);
+}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/ipc/file_fuzz.cc 
b/cpp/src/arrow/util/fuzz_internal.cc
similarity index 54%
copy from cpp/src/arrow/ipc/file_fuzz.cc
copy to cpp/src/arrow/util/fuzz_internal.cc
index 840d19a4ef..c4eddc5126 100644
--- a/cpp/src/arrow/ipc/file_fuzz.cc
+++ b/cpp/src/arrow/util/fuzz_internal.cc
@@ -15,14 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <memory>
+#include "arrow/util/fuzz_internal.h"
 
-#include "arrow/ipc/reader.h"
+#include "arrow/memory_pool.h"
 #include "arrow/status.h"
-#include "arrow/util/macros.h"
+#include "arrow/util/logging_internal.h"
 
-extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
-  auto status = arrow::ipc::internal::FuzzIpcFile(data, 
static_cast<int64_t>(size));
-  ARROW_UNUSED(status);
-  return 0;
+namespace arrow::internal {
+
+MemoryPool* fuzzing_memory_pool() {
+  static auto pool = std::make_shared<::arrow::CappedMemoryPool>(
+      ::arrow::default_memory_pool(), 
/*bytes_allocated_limit=*/kFuzzingMemoryLimit);
+  return pool.get();
+}
+
+void LogFuzzStatus(const Status& st, const uint8_t* data, int64_t size) {
+  // Most fuzz inputs will be invalid and generate errors, only log potential 
OOMs
+  if (st.IsOutOfMemory()) {
+    ARROW_LOG(WARNING) << "Fuzzing input with size=" << size
+                       << " hit allocation failure: " << st.ToString();
+  }
 }
+
+}  // namespace arrow::internal
diff --git a/cpp/src/arrow/ipc/file_fuzz.cc b/cpp/src/arrow/util/fuzz_internal.h
similarity index 57%
copy from cpp/src/arrow/ipc/file_fuzz.cc
copy to cpp/src/arrow/util/fuzz_internal.h
index 840d19a4ef..f3f104eec6 100644
--- a/cpp/src/arrow/ipc/file_fuzz.cc
+++ b/cpp/src/arrow/util/fuzz_internal.h
@@ -15,14 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <memory>
+#pragma once
 
-#include "arrow/ipc/reader.h"
-#include "arrow/status.h"
+#include <cstdint>
+
+#include "arrow/type_fwd.h"
 #include "arrow/util/macros.h"
 
-extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
-  auto status = arrow::ipc::internal::FuzzIpcFile(data, 
static_cast<int64_t>(size));
-  ARROW_UNUSED(status);
-  return 0;
-}
+namespace arrow::internal {
+
+// The default rss_limit_mb on OSS-Fuzz is 2560 MB and we want to fail 
allocations
+// before that limit is reached, otherwise the fuzz target gets killed 
(GH-48105).
+constexpr int64_t kFuzzingMemoryLimit = 2200LL * 1000 * 1000;
+
+/// Return a memory pool that will not allocate more than kFuzzingMemoryLimit 
bytes.
+ARROW_EXPORT MemoryPool* fuzzing_memory_pool();
+
+// Optionally log the outcome of fuzzing an input
+ARROW_EXPORT void LogFuzzStatus(const Status&, const uint8_t* data, int64_t 
size);
+
+}  // namespace arrow::internal
diff --git a/cpp/src/parquet/arrow/fuzz.cc b/cpp/src/parquet/arrow/fuzz.cc
index f1c724508c..8a4cfbceff 100644
--- a/cpp/src/parquet/arrow/fuzz.cc
+++ b/cpp/src/parquet/arrow/fuzz.cc
@@ -16,10 +16,11 @@
 // under the License.
 
 #include "arrow/status.h"
+#include "arrow/util/fuzz_internal.h"
 #include "parquet/arrow/reader.h"
 
 extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
   auto status = parquet::arrow::internal::FuzzReader(data, 
static_cast<int64_t>(size));
-  ARROW_UNUSED(status);
+  arrow::internal::LogFuzzStatus(status, data, static_cast<int64_t>(size));
   return 0;
 }
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 2933664b27..c32e588688 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -29,6 +29,7 @@
 #include "arrow/buffer.h"
 #include "arrow/extension_type.h"
 #include "arrow/io/memory.h"
+#include "arrow/memory_pool.h"
 #include "arrow/record_batch.h"
 #include "arrow/table.h"
 #include "arrow/type.h"
@@ -36,6 +37,7 @@
 #include "arrow/util/async_generator.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/future.h"
+#include "arrow/util/fuzz_internal.h"
 #include "arrow/util/iterator.h"
 #include "arrow/util/logging_internal.h"
 #include "arrow/util/parallel.h"
@@ -1403,7 +1405,7 @@ namespace internal {
 
 namespace {
 
-Status FuzzReader(std::unique_ptr<FileReader> reader) {
+Status FuzzReadData(std::unique_ptr<FileReader> reader) {
   auto st = Status::OK();
   for (int i = 0; i < reader->num_row_groups(); ++i) {
     std::shared_ptr<Table> table;
@@ -1490,7 +1492,7 @@ Status FuzzReader(const uint8_t* data, int64_t size) {
 
   auto buffer = std::make_shared<::arrow::Buffer>(data, size);
   auto file = std::make_shared<::arrow::io::BufferReader>(buffer);
-  auto pool = ::arrow::default_memory_pool();
+  auto pool = ::arrow::internal::fuzzing_memory_pool();
   auto reader_properties = default_reader_properties();
   std::default_random_engine rng(/*seed*/ 42);
 
@@ -1562,7 +1564,7 @@ Status FuzzReader(const uint8_t* data, int64_t size) {
 
     std::unique_ptr<FileReader> reader;
     RETURN_NOT_OK(FileReader::Make(pool, std::move(pq_file_reader), 
properties, &reader));
-    st &= FuzzReader(std::move(reader));
+    st &= FuzzReadData(std::move(reader));
   }
   return st;
 }

Reply via email to