This is an automated email from the ASF dual-hosted git repository.
jonkeane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 376afb8883 GH-49340: [R] Preserve row order in `write_dataset()`
(#49343)
376afb8883 is described below
commit 376afb888368c252767aa249af06e7ae23c60f7a
Author: Steve Martin <[email protected]>
AuthorDate: Tue Feb 24 10:19:29 2026 -0500
GH-49340: [R] Preserve row order in `write_dataset()` (#49343)
### Rationale for this change
`write_dataset(df)` need not preserve the row-ordering of `df` across
partitions. The arrow C++ library was recently updated (since 21.0.0) so that
row ordering can be preserved when writing across partitions. This is useful
for cases where it is assumed that row-ordering is unchanged within each
partition.
``` r
df <- tibble::tibble(x = 1:1.5e6, g = rep(1:15, each = 1e5))
df |>
dplyr::group_by(g) |>
arrow::write_dataset("test1", preserve_order = FALSE)
df |>
dplyr::group_by(g) |>
arrow::write_dataset("test2", preserve_order = TRUE)
test1 <- arrow::open_dataset("test1") |>
dplyr::collect()
test2 <- arrow::open_dataset("test2") |>
dplyr::collect()
# Current behavior.
all.equal(test1 |> sort_by(~ g), df)
#> [1] "Component \"x\": Mean relative difference: 0.0475804"
# Preserve order.
all.equal(test2 |> sort_by(~ g), df)
#> [1] TRUE
```
<sup>Created on 2026-02-20 with [reprex
v2.1.1](https://reprex.tidyverse.org)</sup>
### What changes are included in this PR?
Added an argument `preserve_order` to `write_dataset()` that sets
`FileSystemDatasetWriteOptions.preserve_order` to true in the call to
`ExecPlan_Write()`.
### Are these changes tested?
Partially. The change is small, so I haven't written unit tests. I can
revisit this if necessary.
### Are there any user-facing changes?
Yes, there is a new argument in `write_dataset()`. The default keeps the
current behavior and the argument appears after all existing arguments, so the
change in backwards compatible.
* GitHub Issue: #49340
Authored-by: Steve Martin <[email protected]>
Signed-off-by: Jonathan Keane <[email protected]>
---
r/R/arrowExports.R | 4 ++--
r/R/dataset-write.R | 25 +++++++++++++++++--------
r/man/write_dataset.Rd | 5 ++++-
r/man/write_delim_dataset.Rd | 11 ++++++++---
r/src/arrowExports.cpp | 11 ++++++-----
r/src/compute-exec.cpp | 23 +++++++++++------------
r/tests/testthat/test-dataset-write.R | 23 +++++++++++++++++++++++
7 files changed, 71 insertions(+), 31 deletions(-)
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index a8387526b2..455e6bc8a7 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -468,8 +468,8 @@ ExecNode_Scan <- function(plan, dataset, filter,
projection) {
.Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, projection)
}
-ExecPlan_Write <- function(plan, final_node, schema, file_write_options,
filesystem, base_dir, partitioning, basename_template, existing_data_behavior,
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group,
max_rows_per_group, create_directory) {
- invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, schema,
file_write_options, filesystem, base_dir, partitioning, basename_template,
existing_data_behavior, max_partitions, max_open_files, max_rows_per_file,
min_rows_per_group, max_rows_per_group, create_directory))
+ExecPlan_Write <- function(plan, final_node, schema, file_write_options,
filesystem, base_dir, partitioning, basename_template, existing_data_behavior,
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group,
max_rows_per_group, create_directory, preserve_order) {
+ invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, schema,
file_write_options, filesystem, base_dir, partitioning, basename_template,
existing_data_behavior, max_partitions, max_open_files, max_rows_per_file,
min_rows_per_group, max_rows_per_group, create_directory, preserve_order))
}
ExecNode_Filter <- function(input, filter) {
diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R
index 9b6727211b..663e1b8f08 100644
--- a/r/R/dataset-write.R
+++ b/r/R/dataset-write.R
@@ -70,7 +70,8 @@
#' @param create_directory whether to create the directories written into.
#' Requires appropriate permissions on the storage backend. If set to FALSE,
#' directories are assumed to be already present if writing on a classic
-#' hierarchical filesystem. Default is TRUE
+#' hierarchical filesystem. Default is TRUE.
+#' @param preserve_order Preserve the order of the rows.
#' @param ... additional format-specific arguments. For available Parquet
#' options, see [write_parquet()]. The available Feather options are:
#' - `use_legacy_format` logical: write data formatted so that Arrow libraries
@@ -138,6 +139,7 @@ write_dataset <- function(
min_rows_per_group = 0L,
max_rows_per_group = bitwShiftL(1, 20),
create_directory = TRUE,
+ preserve_order = FALSE,
...
) {
format <- match.arg(format)
@@ -238,7 +240,8 @@ write_dataset <- function(
max_rows_per_file,
min_rows_per_group,
max_rows_per_group,
- create_directory
+ create_directory,
+ preserve_order
)
}
@@ -284,7 +287,8 @@ write_delim_dataset <- function(
delim = ",",
na = "",
eol = "\n",
- quote = c("needed", "all", "none")
+ quote = c("needed", "all", "none"),
+ preserve_order = FALSE
) {
if (!missing(max_rows_per_file) && missing(max_rows_per_group) &&
max_rows_per_group > max_rows_per_file) {
max_rows_per_group <- max_rows_per_file
@@ -312,7 +316,8 @@ write_delim_dataset <- function(
delimiter = delim,
null_string = na,
eol = eol,
- quoting_style = quote
+ quoting_style = quote,
+ preserve_order = preserve_order
)
}
@@ -335,7 +340,8 @@ write_csv_dataset <- function(
delim = ",",
na = "",
eol = "\n",
- quote = c("needed", "all", "none")
+ quote = c("needed", "all", "none"),
+ preserve_order = FALSE
) {
if (!missing(max_rows_per_file) && missing(max_rows_per_group) &&
max_rows_per_group > max_rows_per_file) {
max_rows_per_group <- max_rows_per_file
@@ -363,7 +369,8 @@ write_csv_dataset <- function(
delimiter = delim,
null_string = na,
eol = eol,
- quoting_style = quote
+ quoting_style = quote,
+ preserve_order = preserve_order
)
}
@@ -385,7 +392,8 @@ write_tsv_dataset <- function(
batch_size = 1024L,
na = "",
eol = "\n",
- quote = c("needed", "all", "none")
+ quote = c("needed", "all", "none"),
+ preserve_order = FALSE
) {
if (!missing(max_rows_per_file) && missing(max_rows_per_group) &&
max_rows_per_group > max_rows_per_file) {
max_rows_per_group <- max_rows_per_file
@@ -412,7 +420,8 @@ write_tsv_dataset <- function(
batch_size = batch_size,
null_string = na,
eol = eol,
- quoting_style = quote
+ quoting_style = quote,
+ preserve_order = preserve_order
)
}
diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd
index 85981761df..7df7843b22 100644
--- a/r/man/write_dataset.Rd
+++ b/r/man/write_dataset.Rd
@@ -18,6 +18,7 @@ write_dataset(
min_rows_per_group = 0L,
max_rows_per_group = bitwShiftL(1, 20),
create_directory = TRUE,
+ preserve_order = FALSE,
...
)
}
@@ -85,7 +86,9 @@ greater than \code{min_rows_per_group}. Default is 1024 *
1024.}
\item{create_directory}{whether to create the directories written into.
Requires appropriate permissions on the storage backend. If set to FALSE,
directories are assumed to be already present if writing on a classic
-hierarchical filesystem. Default is TRUE}
+hierarchical filesystem. Default is TRUE.}
+
+\item{preserve_order}{Preserve the order of the rows.}
\item{...}{additional format-specific arguments. For available Parquet
options, see \code{\link[=write_parquet]{write_parquet()}}. The available
Feather options are:
diff --git a/r/man/write_delim_dataset.Rd b/r/man/write_delim_dataset.Rd
index 2dcd9707dc..5847f8f15e 100644
--- a/r/man/write_delim_dataset.Rd
+++ b/r/man/write_delim_dataset.Rd
@@ -23,7 +23,8 @@ write_delim_dataset(
delim = ",",
na = "",
eol = "\\n",
- quote = c("needed", "all", "none")
+ quote = c("needed", "all", "none"),
+ preserve_order = FALSE
)
write_csv_dataset(
@@ -43,7 +44,8 @@ write_csv_dataset(
delim = ",",
na = "",
eol = "\\n",
- quote = c("needed", "all", "none")
+ quote = c("needed", "all", "none"),
+ preserve_order = FALSE
)
write_tsv_dataset(
@@ -62,7 +64,8 @@ write_tsv_dataset(
batch_size = 1024L,
na = "",
eol = "\\n",
- quote = c("needed", "all", "none")
+ quote = c("needed", "all", "none"),
+ preserve_order = FALSE
)
}
\arguments{
@@ -145,6 +148,8 @@ interpret all values as strings if schema is inferred.
cell delimiters (,) or line endings (\\r, \\n), (following RFC4180). If values
contain these characters, an error is caused when attempting to write.
}}
+
+\item{preserve_order}{Preserve the order of the rows.}
}
\value{
The input \code{dataset}, invisibly.
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 14c6330074..adfd90c8a5 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1084,8 +1084,8 @@ extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP
dataset_sexp, SEXP fil
// compute-exec.cpp
#if defined(ARROW_R_WITH_DATASET)
-void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan, const
std::shared_ptr<acero::ExecNode>& final_node, const
std::shared_ptr<arrow::Schema>& schema, const
std::shared_ptr<ds::FileWriteOptions>& file_write_options, const
std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const
std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
arrow::dataset::ExistingDataBehavior existing_data_behavior, int
max_partitions, uint32_t max_open_files [...]
-extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp,
SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP
base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP
existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP
max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp,
SEXP max_rows_per_group_sexp, SEXP create_directory_sexp){
+void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan, const
std::shared_ptr<acero::ExecNode>& final_node, const
std::shared_ptr<arrow::Schema>& schema, const
std::shared_ptr<ds::FileWriteOptions>& file_write_options, const
std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const
std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
arrow::dataset::ExistingDataBehavior existing_data_behavior, int
max_partitions, uint32_t max_open_files [...]
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp,
SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP
base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP
existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP
max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp,
SEXP max_rows_per_group_sexp, SEXP create_directory_sexp, SEXP
preserve_order_sexp){
BEGIN_CPP11
arrow::r::Input<const std::shared_ptr<acero::ExecPlan>&>::type
plan(plan_sexp);
arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type
final_node(final_node_sexp);
@@ -1102,12 +1102,13 @@ BEGIN_CPP11
arrow::r::Input<uint64_t>::type
min_rows_per_group(min_rows_per_group_sexp);
arrow::r::Input<uint64_t>::type
max_rows_per_group(max_rows_per_group_sexp);
arrow::r::Input<bool>::type create_directory(create_directory_sexp);
- ExecPlan_Write(plan, final_node, schema, file_write_options,
filesystem, base_dir, partitioning, basename_template, existing_data_behavior,
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group,
max_rows_per_group, create_directory);
+ arrow::r::Input<bool>::type preserve_order(preserve_order_sexp);
+ ExecPlan_Write(plan, final_node, schema, file_write_options,
filesystem, base_dir, partitioning, basename_template, existing_data_behavior,
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group,
max_rows_per_group, create_directory, preserve_order);
return R_NilValue;
END_CPP11
}
#else
-extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp,
SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP
base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP
existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP
max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp,
SEXP max_rows_per_group_sexp, SEXP create_directory_sexp){
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp,
SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP
base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP
existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP
max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp,
SEXP max_rows_per_group_sexp, SEXP create_directory_sexp, SEXP
preserve_order_sexp){
Rf_error("Cannot call ExecPlan_Write(). See
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow
C++ libraries. ");
}
#endif
@@ -5824,7 +5825,7 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_ExecNode_output_schema", (DL_FUNC)
&_arrow_ExecNode_output_schema, 1},
{ "_arrow_ExecNode_has_ordered_batches", (DL_FUNC)
&_arrow_ExecNode_has_ordered_batches, 1},
{ "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4},
- { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write,
15},
+ { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write,
16},
{ "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter,
2},
{ "_arrow_ExecNode_Project", (DL_FUNC)
&_arrow_ExecNode_Project, 3},
{ "_arrow_ExecNode_Aggregate", (DL_FUNC)
&_arrow_ExecNode_Aggregate, 3},
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index 4191e44862..9912210a14 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -307,18 +307,16 @@ std::shared_ptr<acero::ExecNode> ExecNode_Scan(
}
// [[dataset::export]]
-void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan,
- const std::shared_ptr<acero::ExecNode>& final_node,
- const std::shared_ptr<arrow::Schema>& schema,
- const std::shared_ptr<ds::FileWriteOptions>&
file_write_options,
- const std::shared_ptr<fs::FileSystem>& filesystem,
- std::string base_dir,
- const std::shared_ptr<ds::Partitioning>& partitioning,
- std::string basename_template,
- arrow::dataset::ExistingDataBehavior
existing_data_behavior,
- int max_partitions, uint32_t max_open_files,
- uint64_t max_rows_per_file, uint64_t min_rows_per_group,
- uint64_t max_rows_per_group, bool create_directory) {
+void ExecPlan_Write(
+ const std::shared_ptr<acero::ExecPlan>& plan,
+ const std::shared_ptr<acero::ExecNode>& final_node,
+ const std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<ds::FileWriteOptions>& file_write_options,
+ const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
+ const std::shared_ptr<ds::Partitioning>& partitioning, std::string
basename_template,
+ arrow::dataset::ExistingDataBehavior existing_data_behavior, int
max_partitions,
+ uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t
min_rows_per_group,
+ uint64_t max_rows_per_group, bool create_directory, bool preserve_order) {
arrow::dataset::internal::Initialize();
// TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
@@ -336,6 +334,7 @@ void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>&
plan,
opts.min_rows_per_group = min_rows_per_group;
opts.max_rows_per_group = max_rows_per_group;
opts.create_dir = create_directory;
+ opts.preserve_order = preserve_order;
ds::WriteNodeOptions options(std::move(opts));
options.custom_schema = std::move(schema);
diff --git a/r/tests/testthat/test-dataset-write.R
b/r/tests/testthat/test-dataset-write.R
index d675e4950d..d62b888163 100644
--- a/r/tests/testthat/test-dataset-write.R
+++ b/r/tests/testthat/test-dataset-write.R
@@ -1015,3 +1015,26 @@ test_that("Dataset write wrappers can write flat files
using readr::write_csv()
c("true", "false", "NOVALUE", "true", "false", "true", "false", "NOVALUE",
"true", "false")
)
})
+
+test_that("Row order is preserved when writing large parquet dataset", {
+ skip_if_not_available("parquet")
+ # Make a data frame with a sufficiently large number of rows.
+ df <- data.frame(x = 1:1.1e6)
+
+ unordered_dir <- make_temp_dir()
+ write_dataset(df, unordered_dir)
+
+ ordered_dir <- make_temp_dir()
+ write_dataset(df, ordered_dir, preserve_order = TRUE)
+
+ unordered_ds <- open_dataset(unordered_dir) |> collect()
+ ordered_ds <- open_dataset(ordered_dir) |> collect()
+
+ # Unordered is set equal, but not necessarily equal.
+ expect_setequal(unordered_ds$x, df$x)
+ # expect_false(all(unordered_ds$x == df$x)) can fail on certain
+ # platforms, so is not tested.
+
+ # But ordered is exactly equal.
+ expect_equal(ordered_ds$x, df$x)
+})