This is an automated email from the ASF dual-hosted git repository.

jonkeane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 376afb8883 GH-49340: [R] Preserve row order in `write_dataset()` 
(#49343)
376afb8883 is described below

commit 376afb888368c252767aa249af06e7ae23c60f7a
Author: Steve Martin <[email protected]>
AuthorDate: Tue Feb 24 10:19:29 2026 -0500

    GH-49340: [R] Preserve row order in `write_dataset()` (#49343)
    
    ### Rationale for this change
    
    `write_dataset(df)` need not preserve the row-ordering of `df` across 
partitions. The arrow C++ library was recently updated (since 21.0.0) so that 
row ordering can be preserved when writing across partitions. This is useful 
for cases where it is assumed that row-ordering is unchanged within each 
partition.
    
    ``` r
    df <- tibble::tibble(x = 1:1.5e6, g = rep(1:15, each = 1e5))
    
    df |>
      dplyr::group_by(g) |>
      arrow::write_dataset("test1", preserve_order = FALSE)
    
    df |>
      dplyr::group_by(g) |>
      arrow::write_dataset("test2", preserve_order = TRUE)
    
    test1 <- arrow::open_dataset("test1") |>
      dplyr::collect()
    
    test2 <- arrow::open_dataset("test2") |>
      dplyr::collect()
    
    # Current behavior.
    all.equal(test1 |> sort_by(~ g), df)
    #> [1] "Component \"x\": Mean relative difference: 0.0475804"
    
    # Preserve order.
    all.equal(test2 |> sort_by(~ g), df)
    #> [1] TRUE
    ```
    
    <sup>Created on 2026-02-20 with [reprex 
v2.1.1](https://reprex.tidyverse.org)</sup>
    
    ### What changes are included in this PR?
    
    Added an argument `preserve_order` to `write_dataset()` that sets 
`FileSystemDatasetWriteOptions.preserve_order` to true in the call to 
`ExecPlan_Write()`.
    
    ### Are these changes tested?
    
    Partially. The change is small, so I haven't written unit tests. I can 
revisit this if necessary.
    
    ### Are there any user-facing changes?
    
    Yes, there is a new argument in `write_dataset()`. The default keeps the 
current behavior and the argument appears after all existing arguments, so the 
change in backwards compatible.
    
    * GitHub Issue: #49340
    
    Authored-by: Steve Martin <[email protected]>
    Signed-off-by: Jonathan Keane <[email protected]>
---
 r/R/arrowExports.R                    |  4 ++--
 r/R/dataset-write.R                   | 25 +++++++++++++++++--------
 r/man/write_dataset.Rd                |  5 ++++-
 r/man/write_delim_dataset.Rd          | 11 ++++++++---
 r/src/arrowExports.cpp                | 11 ++++++-----
 r/src/compute-exec.cpp                | 23 +++++++++++------------
 r/tests/testthat/test-dataset-write.R | 23 +++++++++++++++++++++++
 7 files changed, 71 insertions(+), 31 deletions(-)

diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index a8387526b2..455e6bc8a7 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -468,8 +468,8 @@ ExecNode_Scan <- function(plan, dataset, filter, 
projection) {
   .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, projection)
 }
 
-ExecPlan_Write <- function(plan, final_node, schema, file_write_options, 
filesystem, base_dir, partitioning, basename_template, existing_data_behavior, 
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, 
max_rows_per_group, create_directory) {
-  invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, schema, 
file_write_options, filesystem, base_dir, partitioning, basename_template, 
existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, 
min_rows_per_group, max_rows_per_group, create_directory))
+ExecPlan_Write <- function(plan, final_node, schema, file_write_options, 
filesystem, base_dir, partitioning, basename_template, existing_data_behavior, 
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, 
max_rows_per_group, create_directory, preserve_order) {
+  invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, schema, 
file_write_options, filesystem, base_dir, partitioning, basename_template, 
existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, 
min_rows_per_group, max_rows_per_group, create_directory, preserve_order))
 }
 
 ExecNode_Filter <- function(input, filter) {
diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R
index 9b6727211b..663e1b8f08 100644
--- a/r/R/dataset-write.R
+++ b/r/R/dataset-write.R
@@ -70,7 +70,8 @@
 #' @param create_directory whether to create the directories written into.
 #' Requires appropriate permissions on the storage backend. If set to FALSE,
 #' directories are assumed to be already present if writing on a classic
-#' hierarchical filesystem. Default is TRUE
+#' hierarchical filesystem. Default is TRUE.
+#' @param preserve_order Preserve the order of the rows.
 #' @param ... additional format-specific arguments. For available Parquet
 #' options, see [write_parquet()]. The available Feather options are:
 #' - `use_legacy_format` logical: write data formatted so that Arrow libraries
@@ -138,6 +139,7 @@ write_dataset <- function(
   min_rows_per_group = 0L,
   max_rows_per_group = bitwShiftL(1, 20),
   create_directory = TRUE,
+  preserve_order = FALSE,
   ...
 ) {
   format <- match.arg(format)
@@ -238,7 +240,8 @@ write_dataset <- function(
     max_rows_per_file,
     min_rows_per_group,
     max_rows_per_group,
-    create_directory
+    create_directory,
+    preserve_order
   )
 }
 
@@ -284,7 +287,8 @@ write_delim_dataset <- function(
   delim = ",",
   na = "",
   eol = "\n",
-  quote = c("needed", "all", "none")
+  quote = c("needed", "all", "none"),
+  preserve_order = FALSE
 ) {
   if (!missing(max_rows_per_file) && missing(max_rows_per_group) && 
max_rows_per_group > max_rows_per_file) {
     max_rows_per_group <- max_rows_per_file
@@ -312,7 +316,8 @@ write_delim_dataset <- function(
     delimiter = delim,
     null_string = na,
     eol = eol,
-    quoting_style = quote
+    quoting_style = quote,
+    preserve_order = preserve_order
   )
 }
 
@@ -335,7 +340,8 @@ write_csv_dataset <- function(
   delim = ",",
   na = "",
   eol = "\n",
-  quote = c("needed", "all", "none")
+  quote = c("needed", "all", "none"),
+  preserve_order = FALSE
 ) {
   if (!missing(max_rows_per_file) && missing(max_rows_per_group) && 
max_rows_per_group > max_rows_per_file) {
     max_rows_per_group <- max_rows_per_file
@@ -363,7 +369,8 @@ write_csv_dataset <- function(
     delimiter = delim,
     null_string = na,
     eol = eol,
-    quoting_style = quote
+    quoting_style = quote,
+    preserve_order = preserve_order
   )
 }
 
@@ -385,7 +392,8 @@ write_tsv_dataset <- function(
   batch_size = 1024L,
   na = "",
   eol = "\n",
-  quote = c("needed", "all", "none")
+  quote = c("needed", "all", "none"),
+  preserve_order = FALSE
 ) {
   if (!missing(max_rows_per_file) && missing(max_rows_per_group) && 
max_rows_per_group > max_rows_per_file) {
     max_rows_per_group <- max_rows_per_file
@@ -412,7 +420,8 @@ write_tsv_dataset <- function(
     batch_size = batch_size,
     null_string = na,
     eol = eol,
-    quoting_style = quote
+    quoting_style = quote,
+    preserve_order = preserve_order
   )
 }
 
diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd
index 85981761df..7df7843b22 100644
--- a/r/man/write_dataset.Rd
+++ b/r/man/write_dataset.Rd
@@ -18,6 +18,7 @@ write_dataset(
   min_rows_per_group = 0L,
   max_rows_per_group = bitwShiftL(1, 20),
   create_directory = TRUE,
+  preserve_order = FALSE,
   ...
 )
 }
@@ -85,7 +86,9 @@ greater than \code{min_rows_per_group}. Default is 1024 * 
1024.}
 \item{create_directory}{whether to create the directories written into.
 Requires appropriate permissions on the storage backend. If set to FALSE,
 directories are assumed to be already present if writing on a classic
-hierarchical filesystem. Default is TRUE}
+hierarchical filesystem. Default is TRUE.}
+
+\item{preserve_order}{Preserve the order of the rows.}
 
 \item{...}{additional format-specific arguments. For available Parquet
 options, see \code{\link[=write_parquet]{write_parquet()}}. The available 
Feather options are:
diff --git a/r/man/write_delim_dataset.Rd b/r/man/write_delim_dataset.Rd
index 2dcd9707dc..5847f8f15e 100644
--- a/r/man/write_delim_dataset.Rd
+++ b/r/man/write_delim_dataset.Rd
@@ -23,7 +23,8 @@ write_delim_dataset(
   delim = ",",
   na = "",
   eol = "\\n",
-  quote = c("needed", "all", "none")
+  quote = c("needed", "all", "none"),
+  preserve_order = FALSE
 )
 
 write_csv_dataset(
@@ -43,7 +44,8 @@ write_csv_dataset(
   delim = ",",
   na = "",
   eol = "\\n",
-  quote = c("needed", "all", "none")
+  quote = c("needed", "all", "none"),
+  preserve_order = FALSE
 )
 
 write_tsv_dataset(
@@ -62,7 +64,8 @@ write_tsv_dataset(
   batch_size = 1024L,
   na = "",
   eol = "\\n",
-  quote = c("needed", "all", "none")
+  quote = c("needed", "all", "none"),
+  preserve_order = FALSE
 )
 }
 \arguments{
@@ -145,6 +148,8 @@ interpret all values as strings if schema is inferred.
 cell delimiters (,) or line endings (\\r, \\n), (following RFC4180). If values
 contain these characters, an error is caused when attempting to write.
 }}
+
+\item{preserve_order}{Preserve the order of the rows.}
 }
 \value{
 The input \code{dataset}, invisibly.
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 14c6330074..adfd90c8a5 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1084,8 +1084,8 @@ extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP 
dataset_sexp, SEXP fil
 
 // compute-exec.cpp
 #if defined(ARROW_R_WITH_DATASET)
-void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan, const 
std::shared_ptr<acero::ExecNode>& final_node, const 
std::shared_ptr<arrow::Schema>& schema, const 
std::shared_ptr<ds::FileWriteOptions>& file_write_options, const 
std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const 
std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, 
arrow::dataset::ExistingDataBehavior existing_data_behavior, int 
max_partitions, uint32_t max_open_files [...]
-extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, 
SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP 
base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP 
existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP 
max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, 
SEXP max_rows_per_group_sexp, SEXP create_directory_sexp){
+void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan, const 
std::shared_ptr<acero::ExecNode>& final_node, const 
std::shared_ptr<arrow::Schema>& schema, const 
std::shared_ptr<ds::FileWriteOptions>& file_write_options, const 
std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const 
std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, 
arrow::dataset::ExistingDataBehavior existing_data_behavior, int 
max_partitions, uint32_t max_open_files [...]
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, 
SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP 
base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP 
existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP 
max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, 
SEXP max_rows_per_group_sexp, SEXP create_directory_sexp, SEXP 
preserve_order_sexp){
 BEGIN_CPP11
        arrow::r::Input<const std::shared_ptr<acero::ExecPlan>&>::type 
plan(plan_sexp);
        arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type 
final_node(final_node_sexp);
@@ -1102,12 +1102,13 @@ BEGIN_CPP11
        arrow::r::Input<uint64_t>::type 
min_rows_per_group(min_rows_per_group_sexp);
        arrow::r::Input<uint64_t>::type 
max_rows_per_group(max_rows_per_group_sexp);
        arrow::r::Input<bool>::type create_directory(create_directory_sexp);
-       ExecPlan_Write(plan, final_node, schema, file_write_options, 
filesystem, base_dir, partitioning, basename_template, existing_data_behavior, 
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, 
max_rows_per_group, create_directory);
+       arrow::r::Input<bool>::type preserve_order(preserve_order_sexp);
+       ExecPlan_Write(plan, final_node, schema, file_write_options, 
filesystem, base_dir, partitioning, basename_template, existing_data_behavior, 
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, 
max_rows_per_group, create_directory, preserve_order);
        return R_NilValue;
 END_CPP11
 }
 #else
-extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, 
SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP 
base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP 
existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP 
max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, 
SEXP max_rows_per_group_sexp, SEXP create_directory_sexp){
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, 
SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP 
base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP 
existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP 
max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, 
SEXP max_rows_per_group_sexp, SEXP create_directory_sexp, SEXP 
preserve_order_sexp){
        Rf_error("Cannot call ExecPlan_Write(). See 
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow 
C++ libraries. ");
 }
 #endif
@@ -5824,7 +5825,7 @@ static const R_CallMethodDef CallEntries[] = {
                { "_arrow_ExecNode_output_schema", (DL_FUNC) 
&_arrow_ExecNode_output_schema, 1}, 
                { "_arrow_ExecNode_has_ordered_batches", (DL_FUNC) 
&_arrow_ExecNode_has_ordered_batches, 1}, 
                { "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, 
-               { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 
15}, 
+               { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 
16}, 
                { "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 
2}, 
                { "_arrow_ExecNode_Project", (DL_FUNC) 
&_arrow_ExecNode_Project, 3}, 
                { "_arrow_ExecNode_Aggregate", (DL_FUNC) 
&_arrow_ExecNode_Aggregate, 3}, 
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index 4191e44862..9912210a14 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -307,18 +307,16 @@ std::shared_ptr<acero::ExecNode> ExecNode_Scan(
 }
 
 // [[dataset::export]]
-void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan,
-                    const std::shared_ptr<acero::ExecNode>& final_node,
-                    const std::shared_ptr<arrow::Schema>& schema,
-                    const std::shared_ptr<ds::FileWriteOptions>& 
file_write_options,
-                    const std::shared_ptr<fs::FileSystem>& filesystem,
-                    std::string base_dir,
-                    const std::shared_ptr<ds::Partitioning>& partitioning,
-                    std::string basename_template,
-                    arrow::dataset::ExistingDataBehavior 
existing_data_behavior,
-                    int max_partitions, uint32_t max_open_files,
-                    uint64_t max_rows_per_file, uint64_t min_rows_per_group,
-                    uint64_t max_rows_per_group, bool create_directory) {
+void ExecPlan_Write(
+    const std::shared_ptr<acero::ExecPlan>& plan,
+    const std::shared_ptr<acero::ExecNode>& final_node,
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+    const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
+    const std::shared_ptr<ds::Partitioning>& partitioning, std::string 
basename_template,
+    arrow::dataset::ExistingDataBehavior existing_data_behavior, int 
max_partitions,
+    uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t 
min_rows_per_group,
+    uint64_t max_rows_per_group, bool create_directory, bool preserve_order) {
   arrow::dataset::internal::Initialize();
 
   // TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
@@ -336,6 +334,7 @@ void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& 
plan,
   opts.min_rows_per_group = min_rows_per_group;
   opts.max_rows_per_group = max_rows_per_group;
   opts.create_dir = create_directory;
+  opts.preserve_order = preserve_order;
 
   ds::WriteNodeOptions options(std::move(opts));
   options.custom_schema = std::move(schema);
diff --git a/r/tests/testthat/test-dataset-write.R 
b/r/tests/testthat/test-dataset-write.R
index d675e4950d..d62b888163 100644
--- a/r/tests/testthat/test-dataset-write.R
+++ b/r/tests/testthat/test-dataset-write.R
@@ -1015,3 +1015,26 @@ test_that("Dataset write wrappers can write flat files 
using readr::write_csv()
     c("true", "false", "NOVALUE", "true", "false", "true", "false", "NOVALUE", 
"true", "false")
   )
 })
+
+test_that("Row order is preserved when writing large parquet dataset", {
+  skip_if_not_available("parquet")
+  # Make a data frame with a sufficiently large number of rows.
+  df <- data.frame(x = 1:1.1e6)
+
+  unordered_dir <- make_temp_dir()
+  write_dataset(df, unordered_dir)
+
+  ordered_dir <- make_temp_dir()
+  write_dataset(df, ordered_dir, preserve_order = TRUE)
+
+  unordered_ds <- open_dataset(unordered_dir) |> collect()
+  ordered_ds <- open_dataset(ordered_dir) |> collect()
+
+  # Unordered is set equal, but not necessarily equal.
+  expect_setequal(unordered_ds$x, df$x)
+  # expect_false(all(unordered_ds$x == df$x)) can fail on certain
+  # platforms, so is not tested.
+
+  # But ordered is exactly equal.
+  expect_equal(ordered_ds$x, df$x)
+})

Reply via email to