This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new c9fca0b124 Add Parquet roundtrip benchmarks (#8956)
c9fca0b124 is described below

commit c9fca0b1248911daeab5e92146d33a58f11e15d2
Author: Ed Seidl <[email protected]>
AuthorDate: Mon Dec 8 13:41:16 2025 -0800

    Add Parquet roundtrip benchmarks (#8956)
    
    # Which issue does this PR close?
    
    - Closes #8955.
    
    # Rationale for this change
    
    Baseline for future improvements.
    
    # What changes are included in this PR?
    
    Adds new benchmarks for reading and writing. Currently uses a fixed
    number of row groups, pages, and rows. Cycles through data types and
    encodings.
    
    # Are these changes tested?
    N/A
    
    # Are there any user-facing changes?
    
    No
---
 parquet/Cargo.toml                    |   5 +
 parquet/benches/parquet_round_trip.rs | 473 ++++++++++++++++++++++++++++++++++
 2 files changed, 478 insertions(+)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 504ba312c1..50f69fea54 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -255,6 +255,11 @@ harness = false
 name = "metadata"
 harness = false
 
+[[bench]]
+name = "parquet_round_trip"
+required-features = ["arrow"]
+harness = false
+
 [[bench]]
 name = "row_selector"
 harness = false
diff --git a/parquet/benches/parquet_round_trip.rs 
b/parquet/benches/parquet_round_trip.rs
new file mode 100644
index 0000000000..b239c3ccc7
--- /dev/null
+++ b/parquet/benches/parquet_round_trip.rs
@@ -0,0 +1,473 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, RecordBatch};
+use arrow::datatypes::{DataType, Field, Float32Type, Float64Type, Int32Type, 
Int64Type, Schema};
+use arrow::util::bench_util::{
+    create_binary_array_with_len_range_and_prefix_and_seed, 
create_primitive_array_with_seed,
+    create_string_array_with_len_range_and_prefix_and_seed,
+};
+use arrow_array::{FixedSizeBinaryArray, StringViewArray};
+use bytes::Bytes;
+use criterion::{Criterion, criterion_group, criterion_main};
+use parquet::arrow::ArrowWriter;
+use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+use parquet::basic::Encoding;
+use parquet::file::properties::WriterProperties;
+use rand::{
+    Rng, SeedableRng,
+    distr::{Alphanumeric, StandardUniform},
+    prelude::StdRng,
+};
+use std::sync::Arc;
+
+#[derive(Copy, Clone)]
+pub enum ColumnType {
+    String(usize),
+    StringView(usize),
+    Binary(usize),
+    FixedLen(i32),
+    Int32,
+    Int64,
+    Float,
+    Double,
+}
+
+// arrow::util::bench_util::create_fsb_array with a seed
+
+/// Creates a random (but fixed-seeded) array of fixed size with a given null 
density and length
+fn create_fsb_array_with_seed(
+    size: usize,
+    null_density: f32,
+    fixed_len: i32,
+    seed: u64,
+) -> FixedSizeBinaryArray {
+    let mut rng = StdRng::seed_from_u64(seed);
+
+    let rng = &mut rng;
+    FixedSizeBinaryArray::try_from_sparse_iter_with_size(
+        (0..size).map(|_| {
+            if rng.random::<f32>() < null_density {
+                None
+            } else {
+                let value = rng
+                    .sample_iter::<u8, _>(StandardUniform)
+                    .take(fixed_len as usize)
+                    .collect::<Vec<u8>>();
+                Some(value)
+            }
+        }),
+        fixed_len,
+    )
+    .unwrap()
+}
+
+// arrow::util::bench_util::create_string_view_array_with_max_len with a seed
+
+/// Creates a random (but fixed-seeded) array of rand size with a given max 
size, null density and length
+pub fn create_string_view_array_with_seed(
+    size: usize,
+    null_density: f32,
+    max_str_len: usize,
+    seed: u64,
+) -> StringViewArray {
+    let mut rng = StdRng::seed_from_u64(seed);
+
+    let rng = &mut rng;
+    (0..size)
+        .map(|_| {
+            if rng.random::<f32>() < null_density {
+                None
+            } else {
+                let str_len = rng.random_range(max_str_len / 2..max_str_len);
+                let value = 
rng.sample_iter(&Alphanumeric).take(str_len).collect();
+                let value = String::from_utf8(value).unwrap();
+                Some(value)
+            }
+        })
+        .collect()
+}
+
+fn schema(column_type: ColumnType, num_columns: usize) -> Arc<Schema> {
+    let field_type = match column_type {
+        ColumnType::Binary(_) => DataType::Binary,
+        ColumnType::String(_) => DataType::Utf8,
+        ColumnType::StringView(_) => DataType::Utf8View,
+        ColumnType::FixedLen(size) => DataType::FixedSizeBinary(size),
+        ColumnType::Int32 => DataType::Int32,
+        ColumnType::Int64 => DataType::Int64,
+        ColumnType::Float => DataType::Float32,
+        ColumnType::Double => DataType::Float64,
+    };
+
+    let fields: Vec<Field> = (0..num_columns)
+        .map(|i| Field::new(format!("col_{i}"), field_type.clone(), true))
+        .collect();
+    Arc::new(Schema::new(fields))
+}
+
+fn create_batch(
+    schema: &Arc<Schema>,
+    column_type: ColumnType,
+    seed: usize,
+    num_columns: usize,
+    num_rows: usize,
+) -> RecordBatch {
+    let null_density = 0.0001;
+    let mut arrays: Vec<ArrayRef> = vec![];
+    match column_type {
+        ColumnType::Binary(max_len) => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = 
create_binary_array_with_len_range_and_prefix_and_seed::<i32>(
+                    num_rows,
+                    null_density,
+                    max_len / 2,
+                    max_len,
+                    &[],
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+        ColumnType::String(max_str_len) => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = 
create_string_array_with_len_range_and_prefix_and_seed::<i32>(
+                    num_rows,
+                    null_density,
+                    max_str_len / 2,
+                    max_str_len,
+                    "",
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+        ColumnType::StringView(max_str_len) => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = create_string_view_array_with_seed(
+                    num_rows,
+                    null_density,
+                    max_str_len,
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+        ColumnType::FixedLen(size) => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array =
+                    create_fsb_array_with_seed(num_rows, null_density, size, 
array_seed as u64);
+                arrays.push(Arc::new(array));
+            }
+        }
+        ColumnType::Int32 => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = create_primitive_array_with_seed::<Int32Type>(
+                    num_rows,
+                    null_density,
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+        ColumnType::Int64 => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = create_primitive_array_with_seed::<Int64Type>(
+                    num_rows,
+                    null_density,
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+        ColumnType::Float => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = create_primitive_array_with_seed::<Float32Type>(
+                    num_rows,
+                    null_density,
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+        ColumnType::Double => {
+            for i in 0..num_columns {
+                let array_seed = seed * num_columns + i;
+                let array = create_primitive_array_with_seed::<Float64Type>(
+                    num_rows,
+                    null_density,
+                    array_seed as u64,
+                );
+                arrays.push(Arc::new(array));
+            }
+        }
+    }
+    RecordBatch::try_new(schema.clone(), arrays).unwrap()
+}
+
+#[derive(Copy, Clone)]
+pub struct ParquetFileSpec {
+    column_type: ColumnType,
+    num_columns: usize,
+    num_row_groups: usize,
+    rows_per_row_group: usize,
+    rows_per_page: usize,
+    encoding: Encoding,
+    use_dict: bool,
+}
+
+const DEFAULT_NUM_COLUMNS: usize = 10;
+const DEFAULT_NUM_ROWGROUPS: usize = 10;
+const DEFAULT_ROWS_PER_PAGE: usize = 2_000;
+const DEFAULT_ROWS_PER_ROWGROUP: usize = 10_000;
+
+impl ParquetFileSpec {
+    pub fn new(column_type: ColumnType) -> Self {
+        Self {
+            column_type,
+            num_columns: DEFAULT_NUM_COLUMNS,
+            num_row_groups: DEFAULT_NUM_ROWGROUPS,
+            rows_per_row_group: DEFAULT_ROWS_PER_ROWGROUP,
+            rows_per_page: DEFAULT_ROWS_PER_PAGE,
+            encoding: Encoding::PLAIN,
+            use_dict: true,
+        }
+    }
+
+    pub fn with_num_columns(self, num_columns: usize) -> Self {
+        Self {
+            num_columns,
+            ..self
+        }
+    }
+
+    pub fn with_num_row_groups(self, num_row_groups: usize) -> Self {
+        Self {
+            num_row_groups,
+            ..self
+        }
+    }
+
+    pub fn with_rows_per_row_group(self, rows_per_row_group: usize) -> Self {
+        Self {
+            rows_per_row_group,
+            ..self
+        }
+    }
+
+    pub fn with_rows_per_page(self, rows_per_page: usize) -> Self {
+        Self {
+            rows_per_page,
+            ..self
+        }
+    }
+
+    pub fn with_encoding(self, encoding: Encoding) -> Self {
+        Self { encoding, ..self }
+    }
+
+    pub fn with_use_dict(self, use_dict: bool) -> Self {
+        Self { use_dict, ..self }
+    }
+}
+
+fn file_from_spec(spec: ParquetFileSpec, buffer: &mut Vec<u8>) {
+    const SEED: usize = 31;
+    let num_rows = spec.rows_per_row_group.min(100);
+    let rows_to_write = spec.num_row_groups * spec.rows_per_row_group;
+
+    let schema = schema(spec.column_type, spec.num_columns);
+    let props = WriterProperties::builder()
+        .set_max_row_group_size(spec.rows_per_row_group)
+        .set_data_page_row_count_limit(spec.rows_per_page)
+        .set_encoding(spec.encoding)
+        .set_dictionary_enabled(spec.use_dict)
+        .set_compression(parquet::basic::Compression::UNCOMPRESSED)
+        .build();
+
+    let mut writer = ArrowWriter::try_new(buffer, schema.clone(), 
Some(props)).unwrap();
+
+    // use the same batch repeatedly otherwise the data generation will 
dominate the time
+    let batch = create_batch(&schema, spec.column_type, SEED, 
spec.num_columns, num_rows);
+
+    let mut rows_written = 0;
+    while rows_written < rows_to_write {
+        writer.write(&batch).unwrap();
+        rows_written += num_rows;
+    }
+
+    let parquet_metadata = writer.close().unwrap();
+    assert_eq!(parquet_metadata.num_row_groups(), spec.num_row_groups);
+    assert_eq!(
+        parquet_metadata.file_metadata().num_rows() as usize,
+        rows_to_write
+    );
+}
+
+fn read_write(c: &mut Criterion, spec: ParquetFileSpec, msg: &str) {
+    let mut buffer = Vec::with_capacity(1_000_000);
+
+    // read once to size the buffer
+    file_from_spec(spec, &mut buffer);
+
+    c.bench_function(&format!("write {msg}"), |b| {
+        buffer.clear();
+        b.iter(|| file_from_spec(spec, &mut buffer))
+    });
+
+    let file_bytes = Bytes::from(buffer);
+    c.bench_function(&format!("read {msg}"), |b| {
+        b.iter(|| {
+            let record_reader = 
ParquetRecordBatchReaderBuilder::try_new(file_bytes.clone())
+                .unwrap()
+                .build()
+                .unwrap();
+            let mut num_rows = 0;
+            for maybe_batch in record_reader {
+                let batch = maybe_batch.unwrap();
+                num_rows += batch.num_rows();
+            }
+            assert_eq!(num_rows, spec.num_row_groups * 
spec.rows_per_row_group);
+        })
+    });
+}
+
+fn int_benches(c: &mut Criterion, column_type: ColumnType) {
+    let ctype = match column_type {
+        ColumnType::Int32 => "int32",
+        ColumnType::Int64 => "int64",
+        _ => unreachable!(),
+    };
+
+    let spec = ParquetFileSpec::new(column_type).with_use_dict(true);
+    read_write(c, spec, &format!("{ctype} dict"));
+
+    let spec = spec.with_use_dict(false).with_encoding(Encoding::PLAIN);
+    read_write(c, spec, &format!("{ctype} plain"));
+
+    let spec = spec.with_encoding(Encoding::DELTA_BINARY_PACKED);
+    read_write(c, spec, &format!("{ctype} delta_binary"));
+
+    let spec = spec.with_encoding(Encoding::BYTE_STREAM_SPLIT);
+    read_write(c, spec, &format!("{ctype} byte_stream_split"));
+}
+
+fn float_benches(c: &mut Criterion, column_type: ColumnType) {
+    let ctype = match column_type {
+        ColumnType::Float => "f32",
+        ColumnType::Double => "f64",
+        _ => unreachable!(),
+    };
+
+    let spec = ParquetFileSpec::new(column_type).with_use_dict(true);
+    read_write(c, spec, &format!("{ctype} dict"));
+
+    let spec = spec.with_use_dict(false).with_encoding(Encoding::PLAIN);
+    read_write(c, spec, &format!("{ctype} plain"));
+
+    let spec = spec.with_encoding(Encoding::BYTE_STREAM_SPLIT);
+    read_write(c, spec, &format!("{ctype} byte_stream_split"));
+}
+
+fn string_benches(c: &mut Criterion, max_str_len: usize) {
+    let spec = ParquetFileSpec::new(ColumnType::String(max_str_len))
+        .with_num_columns(5)
+        .with_use_dict(true);
+    read_write(c, spec, &format!("String({max_str_len}) dict"));
+
+    let spec = spec.with_use_dict(false).with_encoding(Encoding::PLAIN);
+    read_write(c, spec, &format!("String({max_str_len}) plain"));
+
+    let spec = spec.with_encoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
+    read_write(c, spec, &format!("String({max_str_len}) delta_length"));
+
+    let spec = spec.with_encoding(Encoding::DELTA_BYTE_ARRAY);
+    read_write(c, spec, &format!("String({max_str_len}) delta_byte_array"));
+
+    let spec = ParquetFileSpec::new(ColumnType::StringView(max_str_len))
+        .with_num_columns(5)
+        .with_use_dict(true);
+    read_write(c, spec, &format!("StringView({max_str_len}) dict"));
+
+    let spec = spec.with_use_dict(false).with_encoding(Encoding::PLAIN);
+    read_write(c, spec, &format!("StringView({max_str_len}) plain"));
+
+    let spec = spec.with_encoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
+    read_write(c, spec, &format!("StringView({max_str_len}) delta_length"));
+
+    let spec = spec.with_encoding(Encoding::DELTA_BYTE_ARRAY);
+    read_write(
+        c,
+        spec,
+        &format!("StringView({max_str_len}) delta_byte_array"),
+    );
+}
+
+fn binary_benches(c: &mut Criterion, max_len: usize) {
+    let spec = ParquetFileSpec::new(ColumnType::Binary(max_len))
+        .with_num_columns(5)
+        .with_use_dict(true);
+    read_write(c, spec, &format!("Binary({max_len}) dict"));
+
+    let spec = spec.with_use_dict(false).with_encoding(Encoding::PLAIN);
+    read_write(c, spec, &format!("Binary({max_len}) plain"));
+
+    let spec = spec.with_encoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
+    read_write(c, spec, &format!("Binary({max_len}) delta_length"));
+
+    let spec = spec.with_encoding(Encoding::DELTA_BYTE_ARRAY);
+    read_write(c, spec, &format!("Binary({max_len}) delta_byte_array"));
+}
+
+fn flba_benches(c: &mut Criterion, len: i32) {
+    let spec = ParquetFileSpec::new(ColumnType::FixedLen(len))
+        .with_num_columns(5)
+        .with_use_dict(true);
+    read_write(c, spec, &format!("Fixed({len}) dict"));
+
+    let spec = spec.with_use_dict(false).with_encoding(Encoding::PLAIN);
+    read_write(c, spec, &format!("Fixed({len}) plain"));
+
+    let spec = spec.with_encoding(Encoding::BYTE_STREAM_SPLIT);
+    read_write(c, spec, &format!("Fixed({len}) byte_stream_split"));
+
+    let spec = spec.with_encoding(Encoding::DELTA_BYTE_ARRAY);
+    read_write(c, spec, &format!("Fixed({len}) delta_byte_array"));
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    int_benches(c, ColumnType::Int32);
+    int_benches(c, ColumnType::Int64);
+    float_benches(c, ColumnType::Float);
+    float_benches(c, ColumnType::Double);
+    string_benches(c, 20);
+    string_benches(c, 100);
+    binary_benches(c, 20);
+    binary_benches(c, 100);
+    flba_benches(c, 2);
+    flba_benches(c, 16);
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);

Reply via email to