This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-nanoarrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0af89254 fix(r): Collect array streams in C (not R) before conversion 
(#828)
0af89254 is described below

commit 0af89254f26fb6bacc59365edf8bab8fbc901e2a
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Dec 3 20:58:01 2025 -0600

    fix(r): Collect array streams in C (not R) before conversion (#828)
    
    This PR moves the proecess of collecting an array stream from R (where
    we had preserve/protect volume issues that made garbage collection very,
    very slow) into C/C++.
    
    Doesn't quite solve #822 but it should help!
    
    Reproducer for generating an IPC file with a lot of strings:
    
    <details>
    
    ```r
    library(nanoarrow)
    
    ascii_bytes <- vapply(letters, charToRaw, raw(1), USE.NAMES = FALSE)
    
    random_string_array <- function(n = 1, n_chars = 16) {
      data_buffer <- sample(ascii_bytes, n_chars * n, replace = TRUE)
      offsets_buffer <- as.integer(seq(0, n * n_chars, length.out = n + 1))
      nanoarrow_array_modify(
        nanoarrow_array_init(na_string()),
        list(
          length = n,
          null_count = 0,
          buffers = list(NULL, offsets_buffer, data_buffer)
        )
      )
    }
    
    random_string_struct <- function(n_rows = 1024, n_cols = 1, n_chars = 16) {
      col_names <- sprintf("col%03d", seq_len(n_cols))
      col_types <- rep(list(na_string()), n_cols)
      names(col_types) <- col_names
      schema <- na_struct(col_types)
    
      columns <- lapply(
        col_names,
        function(...) random_string_array(n_rows, n_chars = n_chars)
      )
    
      nanoarrow_array_modify(
        nanoarrow_array_init(schema),
        list(
          length = n_rows,
          null_count = 0,
          children = columns
        )
      )
    }
    
    random_string_batches <- function(n_batches = 1, n_rows = 1, n_cols = 1, 
n_chars = 16) {
      lapply(
        seq_len(n_batches),
        function(...) random_string_struct(n_rows, n_cols, n_chars)
      )
    }
    
    batches <- random_string_batches(n_batches = 100, n_cols = 160)
    stream <- basic_array_stream(batches)
    write_nanoarrow(stream, "many_strings.arrows")
    ```
    
    </details>
    
    ...in a separate R session, the issues around taking a long time for the
    GC to run seemed to go away (but it would be great to have a check!)
    
    ```r
    library(nanoarrow)
    
    df <- read_nanoarrow("many_strings.arrows") |>
      convert_array_stream()
    f
    nanoarrow:::preserved_count()
    #> [1] 0
    system.time(gc(), gcFirst = FALSE)
    #> user  system elapsed
    #> 0.036   0.001   0.037
    ```
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 r/R/convert-array-stream.R | 22 ++++--------
 r/bootstrap.R              |  4 ---
 r/src/.gitignore           |  2 ++
 r/src/init.c               |  2 ++
 r/src/nanoarrow_cpp.cc     | 88 ++++++++++++++++++++++++++++++++++++++++++++++
 r/tools/make-callentries.R |  2 +-
 6 files changed, 99 insertions(+), 21 deletions(-)

diff --git a/r/R/convert-array-stream.R b/r/R/convert-array-stream.R
index b1f3e85e..c8e73aed 100644
--- a/r/R/convert-array-stream.R
+++ b/r/R/convert-array-stream.R
@@ -72,31 +72,21 @@ convert_array_stream <- function(array_stream, to = NULL, 
size = NULL, n = Inf)
   } else {
     # Otherwise, we need to collect all batches and calculate the total length
     # before calling nanoarrow_c_convert_array_stream().
-    batches <- collect_array_stream(
-      array_stream,
-      n,
-      schema = schema,
-      validate = FALSE
-    )
+    batch_info <- .Call(nanoarrow_c_collect_array_stream, array_stream, n)
 
     # If there is exactly one batch, use convert_array(). Converting a single
     # array currently takes a more efficient code path for types that can be
     # converted as ALTREP (e.g., strings).
-    if (length(batches) == 1L) {
-      return(.Call(nanoarrow_c_convert_array, batches[[1]], to))
+    if (batch_info$n == 1L) {
+      array <- batch_info$stream$get_next(schema)
+      return(.Call(nanoarrow_c_convert_array, array, to))
     }
 
-    # Otherwise, compute the final size, create another array stream,
-    # and call convert_array_stream() with a known size. Using .Call()
-    # directly because we have already type checked the inputs.
-    size <- .Call(nanoarrow_c_array_list_total_length, batches)
-    basic_stream <- .Call(nanoarrow_c_basic_array_stream, batches, schema, 
FALSE)
-
     .Call(
       nanoarrow_c_convert_array_stream,
-      basic_stream,
+      batch_info$stream,
       to,
-      as.double(size),
+      as.double(batch_info$size),
       Inf
     )
   }
diff --git a/r/bootstrap.R b/r/bootstrap.R
index c285f487..041a4b40 100644
--- a/r/bootstrap.R
+++ b/r/bootstrap.R
@@ -60,7 +60,3 @@ stopifnot(file.exists("../CMakeLists.txt") && run_bundler())
 f <- "src/flatcc/portable/pdiagnostic.h"
 lines <- readLines(f)
 writeLines(gsub("^#pragma", "/**/#pragma", lines), f)
-
-# Remove unused files
-unused_files <- list.files("src", "\\.hpp$", full.names = TRUE)
-unlink(unused_files)
diff --git a/r/src/.gitignore b/r/src/.gitignore
index c5bf0ca6..5093a88f 100644
--- a/r/src/.gitignore
+++ b/r/src/.gitignore
@@ -22,5 +22,7 @@ nanoarrow.c
 nanoarrow.h
 nanoarrow_ipc.h
 nanoarrow_ipc.c
+nanoarrow.hpp
+nanoarrow_ipc.hpp
 flatcc*
 Makevars
diff --git a/r/src/init.c b/r/src/init.c
index 4d2109a2..70eec867 100644
--- a/r/src/init.c
+++ b/r/src/init.c
@@ -61,6 +61,7 @@ extern SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP 
buffer_xptr);
 extern SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con);
 extern SEXP nanoarrow_c_ipc_writer_connection(SEXP con);
 extern SEXP nanoarrow_c_ipc_writer_write_stream(SEXP writer_xptr, SEXP 
array_stream_xptr);
+extern SEXP nanoarrow_c_collect_array_stream(SEXP array_stream_xptr, SEXP 
n_sexp);
 extern SEXP nanoarrow_c_allocate_schema(void);
 extern SEXP nanoarrow_c_allocate_array(void);
 extern SEXP nanoarrow_c_allocate_array_stream(void);
@@ -144,6 +145,7 @@ static const R_CallMethodDef CallEntries[] = {
     {"nanoarrow_c_ipc_writer_connection", 
(DL_FUNC)&nanoarrow_c_ipc_writer_connection, 1},
     {"nanoarrow_c_ipc_writer_write_stream", 
(DL_FUNC)&nanoarrow_c_ipc_writer_write_stream,
      2},
+    {"nanoarrow_c_collect_array_stream", 
(DL_FUNC)&nanoarrow_c_collect_array_stream, 2},
     {"nanoarrow_c_allocate_schema", (DL_FUNC)&nanoarrow_c_allocate_schema, 0},
     {"nanoarrow_c_allocate_array", (DL_FUNC)&nanoarrow_c_allocate_array, 0},
     {"nanoarrow_c_allocate_array_stream", 
(DL_FUNC)&nanoarrow_c_allocate_array_stream, 0},
diff --git a/r/src/nanoarrow_cpp.cc b/r/src/nanoarrow_cpp.cc
index 9c0e38d6..83ebe07a 100644
--- a/r/src/nanoarrow_cpp.cc
+++ b/r/src/nanoarrow_cpp.cc
@@ -26,6 +26,9 @@
 #include <thread>
 #include <vector>
 
+#include "nanoarrow.hpp"
+#include "nanoarrow/r.h"
+
 // Without this infrastructure, it's possible to check that all objects
 // are released by running devtools::test(); gc() in a fresh session and
 // making sure that nanoarrow:::preserved_count() is zero afterward.
@@ -201,3 +204,88 @@ extern "C" void 
nanoarrow_preserve_and_release_on_other_thread(SEXP obj) {
   std::thread worker([obj] { nanoarrow_release_sexp(obj); });
   worker.join();
 }
+
+// Collector utility for iterating over and collecting batches
+// Keeping this all in a single object reduces the amount of C++ deletion
+// we need to keep track of.
+struct ArrayVector {
+  nanoarrow::UniqueSchema schema;
+  nanoarrow::UniqueArray batch;
+  std::vector<nanoarrow::UniqueArray> vec;
+};
+
+// Use an external pointer to handle deleting the ArrayVector in
+// the event of a longjmp
+static void release_array_vector_xptr(SEXP array_vector_xptr) {
+  auto ptr = 
reinterpret_cast<ArrayVector*>(R_ExternalPtrAddr(array_vector_xptr));
+  if (ptr != nullptr) {
+    delete ptr;
+  }
+}
+
+// Collects the entire array stream and collects the total number of rows and
+// total number of batches so that the R code on the end of this can decide
+// how best to proceed.
+extern "C" SEXP nanoarrow_c_collect_array_stream(SEXP array_stream_xptr, SEXP 
n_sexp) {
+  struct ArrowArrayStream* array_stream =
+      nanoarrow_array_stream_from_xptr(array_stream_xptr);
+
+  double n_real = REAL(n_sexp)[0];
+  int n;
+  if (R_FINITE(n_real)) {
+    n = (int)n_real;
+  } else {
+    n = INT_MAX;
+  }
+
+  auto array_vector = new ArrayVector();
+  SEXP array_vector_xptr =
+      PROTECT(R_MakeExternalPtr(array_vector, R_NilValue, R_NilValue));
+  R_RegisterCFinalizer(array_vector_xptr, &release_array_vector_xptr);
+
+  struct ArrowError error;
+  ArrowErrorInit(&error);
+  int code = ArrowArrayStreamGetSchema(array_stream, 
array_vector->schema.get(), &error);
+  if (code != NANOARROW_OK) {
+    Rf_error("ArrowArrayStreamGetSchema() failed (%d): %s", code, 
error.message);
+  }
+
+  int64_t n_actual = 0;
+  int64_t size = 0;
+  while (n > 0) {
+    code = ArrowArrayStreamGetNext(array_stream, array_vector->batch.get(), 
&error);
+    if (code != NANOARROW_OK) {
+      Rf_error("ArrowArrayStreamGetNext() failed (%d): %s", code, 
error.message);
+    }
+
+    if (array_vector->batch->release == nullptr) {
+      break;
+    }
+
+    size += array_vector->batch->length;
+    ++n_actual;
+    --n;
+    array_vector->vec.push_back(std::move(array_vector->batch));
+    array_vector->batch.reset();
+
+    R_CheckUserInterrupt();
+  }
+
+  SEXP array_stream_out_xptr = PROTECT(nanoarrow_array_stream_owning_xptr());
+  struct ArrowArrayStream* array_stream_out =
+      nanoarrow_output_array_stream_from_xptr(array_stream_out_xptr);
+
+  nanoarrow::VectorArrayStream(array_vector->schema.get(), 
std::move(array_vector->vec))
+      .ToArrayStream(array_stream_out);
+
+  SEXP size_sexp = PROTECT(Rf_ScalarReal(size));
+  SEXP n_actual_sexp = PROTECT(Rf_ScalarReal(n_actual));
+  const char* names[] = {"stream", "size", "n", ""};
+  SEXP out = PROTECT(Rf_mkNamed(VECSXP, names));
+  SET_VECTOR_ELT(out, 0, array_stream_out_xptr);
+  SET_VECTOR_ELT(out, 1, size_sexp);
+  SET_VECTOR_ELT(out, 2, n_actual_sexp);
+
+  UNPROTECT(5);
+  return out;
+}
diff --git a/r/tools/make-callentries.R b/r/tools/make-callentries.R
index 403169c3..f40fe654 100644
--- a/r/tools/make-callentries.R
+++ b/r/tools/make-callentries.R
@@ -21,7 +21,7 @@
 
 library(tidyverse)
 
-src_files <- list.files("src", "\\.(c|cpp)$", full.names = TRUE) %>%
+src_files <- list.files("src", "\\.(c|cc|cpp)$", full.names = TRUE) %>%
   setdiff("src/init.c")
 src_sources <- src_files %>% set_names() %>% map_chr(readr::read_file)
 

Reply via email to