martin-g commented on code in PR #21707:
URL: https://github.com/apache/datafusion/pull/21707#discussion_r3129308336


##########
benchmarks/benches/sql.rs:
##########
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Criterion benchmark harness for SQL benchmark files under `sql_benchmarks`.
+//!
+//! SQL benchmarks describe setup, queries, result validation, and cleanup in
+//! `.benchmark` files. Run them with `benchmarks/bench.sh` or directly with
+//! Cargo, for example: `BENCH_NAME=tpch cargo bench --bench sql`.
+
+use clap::Parser;
+use criterion::{Criterion, SamplingMode, criterion_group, criterion_main};
+use datafusion::error::Result;
+use datafusion::prelude::SessionContext;
+use datafusion_benchmarks::sql_benchmark::SqlBenchmark;
+use datafusion_benchmarks::util::{CommonOpt, print_memory_stats};
+use datafusion_common::instant::Instant;
+use log::{debug, info};
+use std::collections::BTreeMap;
+use std::fs;
+use tokio::runtime::Runtime;
+
+static SQL_BENCHMARK_DIRECTORY: &str = "sql_benchmarks";
+
+#[cfg(all(feature = "snmalloc", feature = "mimalloc"))]
+compile_error!(
+    "feature \"snmalloc\" and feature \"mimalloc\" cannot be enabled at the 
same time"
+);
+
+#[cfg(feature = "snmalloc")]
+#[global_allocator]
+static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+
+#[derive(Debug, Parser)]
+#[command(ignore_errors = true)]
+struct EnvParser {
+    #[command(flatten)]
+    options: CommonOpt,
+
+    #[arg(
+        env = "BENCH_PERSIST_RESULTS",
+        long = "persist_results",
+        default_value = "false",
+        action = clap::ArgAction::SetTrue
+    )]
+    persist_results: bool,
+
+    #[arg(
+        env = "BENCH_VALIDATE",
+        long = "validate_results",
+        default_value = "false",
+        action = clap::ArgAction::SetTrue
+    )]
+    validate: bool,
+
+    #[arg(env = "BENCH_NAME")]
+    name: Option<String>,
+
+    #[arg(env = "BENCH_SUBGROUP")]
+    subgroup: Option<String>,
+
+    #[arg(env = "BENCH_QUERY")]
+    query: Option<i32>,
+}
+
+#[cfg(feature = "mimalloc")]
+#[global_allocator]
+static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
+
+pub fn sql(c: &mut Criterion) {
+    env_logger::init();
+
+    let start = Instant::now();
+    let args = EnvParser::parse();
+    let rt = make_tokio_runtime();
+
+    println!("Loading benchmarks...");
+
+    let benchmarks = rt.block_on(async {
+        let ctx = make_ctx(&args).expect("SessionContext creation failed");
+
+        load_benchmarks(&args, &ctx, SQL_BENCHMARK_DIRECTORY)
+            .await
+            .unwrap_or_else(|err| panic!("failed load benchmarks: {err:?}"))
+    });
+
+    println!(
+        "Loaded benchmarks in {} ms ...",
+        start.elapsed().as_millis()
+    );
+
+    for (group, benchmarks) in benchmarks {
+        let mut group = c.benchmark_group(group);
+        group.sample_size(10);
+        group.sampling_mode(SamplingMode::Flat);
+
+        for mut benchmark in benchmarks {
+            // create a context
+            let ctx = make_ctx(&args).expect("SessionContext creation failed");
+
+            // initialize the benchmark. This parses the benchmark file and 
does any pre-execution
+            // work such as loading data into tables
+            rt.block_on(async {
+                benchmark
+                    .initialize(&ctx)
+                    .await
+                    .expect("initialization failed");
+
+                // run assertions
+                benchmark.assert(&ctx).await.expect("assertion failed");
+            });
+
+            let mut name = benchmark.name().to_string();
+            if !benchmark.subgroup().is_empty() {
+                name.push('_');
+                name.push_str(benchmark.subgroup());
+            }
+
+            if args.persist_results {
+                handle_persist(&rt, &ctx, &name, &mut benchmark);
+            } else if args.validate {
+                handle_verify(&rt, &ctx, &name, &mut benchmark);
+            } else {
+                info!("Running benchmark {name} ...");
+
+                let name = name.clone();
+                group.bench_function(name.clone(), |b| {
+                    b.iter(|| handle_run(&rt, &ctx, &args, &mut benchmark, 
&name))
+                });
+
+                print_memory_stats();
+
+                info!("Benchmark {name} completed");
+            }
+
+            // run cleanup
+            rt.block_on(async {
+                benchmark.cleanup(&ctx).await.expect("Cleanup failed");
+            });
+        }
+
+        group.finish();
+    }
+}
+
+fn handle_run(
+    rt: &Runtime,
+    ctx: &SessionContext,
+    args: &EnvParser,
+    benchmark: &mut SqlBenchmark,
+    name: &str,
+) {
+    rt.block_on(async {
+        benchmark
+            .run(ctx, args.validate)
+            .await
+            .unwrap_or_else(|err| panic!("Failed to run benchmark {name}: 
{err:?}"))
+    });
+}
+
+fn handle_persist(
+    rt: &Runtime,
+    ctx: &SessionContext,
+    name: &str,
+    benchmark: &mut SqlBenchmark,
+) {
+    info!("Running benchmark {name} prior to persisting results ...");
+
+    rt.block_on(async {
+        info!("Persisting benchmark {name} ...");
+
+        benchmark
+            .persist(ctx)
+            .await
+            .expect("Failed to persist results");
+    });
+
+    info!("Persisted benchmark {name} successfully");
+}
+
+fn handle_verify(
+    rt: &Runtime,
+    ctx: &SessionContext,
+    name: &str,
+    benchmark: &mut SqlBenchmark,
+) {
+    info!("Verifying benchmark {name} results ...");
+
+    rt.block_on(async {
+        benchmark
+            .run(ctx, true)
+            .await
+            .unwrap_or_else(|err| panic!("Failed to run benchmark {name}: 
{err:?}"));
+        benchmark
+            .verify(ctx)
+            .await
+            .unwrap_or_else(|err| panic!("Verification failed: {err:?}"));
+    });
+
+    info!("Verified benchmark {name} results successfully");
+}
+
+criterion_group!(benches, sql);
+criterion_main!(benches);
+
+fn make_tokio_runtime() -> Runtime {
+    tokio::runtime::Builder::new_multi_thread()
+        .enable_all()
+        .build()
+        .unwrap()
+}
+
+fn make_ctx(args: &EnvParser) -> Result<SessionContext> {
+    let config = args.options.config()?;
+    let rt = args.options.build_runtime()?;
+
+    Ok(SessionContext::new_with_config_rt(config, rt))
+}
+
+/// Recursively walks the directory tree starting at `path` and
+/// calls the call back function for every file encountered.
+pub fn list_files<F>(path: &str, callback: &mut F)
+where
+    F: FnMut(&str),
+{
+    let mut entries: Vec<fs::DirEntry> =
+        fs::read_dir(path).unwrap().filter_map(Result::ok).collect();
+    entries.sort_by_key(|entry| entry.path());
+
+    for dir_entry in entries {
+        let path = dir_entry.path();
+        if path.is_dir() {
+            // Recurse into the sub‑directory
+            list_files(&path.to_string_lossy(), callback);
+        } else {
+            // For files, invoke the callback with the full path as a string
+            let full_str = path.to_string_lossy();
+            callback(&full_str);
+        }
+    }
+}
+
+/// Loads all benchmark files in the `sql_benchmarks` directory.
+/// For each file ending with `.benchmark` it creates a new
+/// `SqlBenchmark` instance.
+async fn load_benchmarks(
+    args: &EnvParser,
+    ctx: &SessionContext,
+    path: &str,
+) -> Result<BTreeMap<String, Vec<SqlBenchmark>>> {
+    let mut benches = BTreeMap::new();
+    let mut paths = Vec::new();
+
+    list_files(path, &mut |path: &str| {
+        if path.ends_with(".benchmark") {
+            paths.push(path.to_string());
+        }
+    });
+
+    for path in paths {
+        debug!("Loading benchmark from {path}");
+
+        let benchmark = SqlBenchmark::new(ctx, &path, 
SQL_BENCHMARK_DIRECTORY).await?;
+        let entries = benches
+            .entry(benchmark.group().to_string())
+            .or_insert(vec![]);
+
+        entries.push(benchmark);
+    }
+
+    benches = filter_benchmarks(args, benches);
+    benches.iter_mut().for_each(|(_, benchmarks)| {
+        benchmarks.sort_by(|b1, b2| b1.name().cmp(b2.name()))
+    });
+
+    Ok(benches)
+}
+
+fn filter_benchmarks(
+    args: &EnvParser,
+    benchmarks: BTreeMap<String, Vec<SqlBenchmark>>,
+) -> BTreeMap<String, Vec<SqlBenchmark>> {
+    let benchmarks_to_run: BTreeMap<String, Vec<SqlBenchmark>> = match 
&args.name {
+        Some(bench_name) => benchmarks
+            .into_iter()

Review Comment:
   An alternative implementation that uses `BTreeMap::retain()` to update the 
values in place. It avoids the cloning of the SqlBenchmark objects.
   ```rust
   fn filter_benchmarks(
       args: &EnvParser,
       benchmarks: BTreeMap<String, Vec<SqlBenchmark>>,
   ) -> BTreeMap<String, Vec<SqlBenchmark>> {
       match &args.name {
           Some(bench_name) => benchmarks
               .into_iter()
               .filter(|(key, _val)| key.eq_ignore_ascii_case(bench_name))
               .map(|(key, mut val)| {
                   if let Some(subgroup) = &args.subgroup {
                       val.retain(|bench| 
bench.subgroup().eq_ignore_ascii_case(subgroup));
                   }
                   if let Some(query_number) = &args.query {
                       let padded = format!("Q{query_number:0>2}");
                       val.retain(|bench| 
bench.name().eq_ignore_ascii_case(&padded));
                   }
                   (key, val)
               })
               .collect(),
           None => benchmarks,
       }
   }
   ```



##########
benchmarks/benches/sql.rs:
##########
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Criterion benchmark harness for SQL benchmark files under `sql_benchmarks`.
+//!
+//! SQL benchmarks describe setup, queries, result validation, and cleanup in
+//! `.benchmark` files. Run them with `benchmarks/bench.sh` or directly with
+//! Cargo, for example: `BENCH_NAME=tpch cargo bench --bench sql`.
+
+use clap::Parser;
+use criterion::{Criterion, SamplingMode, criterion_group, criterion_main};
+use datafusion::error::Result;
+use datafusion::prelude::SessionContext;
+use datafusion_benchmarks::sql_benchmark::SqlBenchmark;
+use datafusion_benchmarks::util::{CommonOpt, print_memory_stats};
+use datafusion_common::instant::Instant;
+use log::{debug, info};
+use std::collections::BTreeMap;
+use std::fs;
+use tokio::runtime::Runtime;
+
+static SQL_BENCHMARK_DIRECTORY: &str = "sql_benchmarks";
+
+#[cfg(all(feature = "snmalloc", feature = "mimalloc"))]
+compile_error!(
+    "feature \"snmalloc\" and feature \"mimalloc\" cannot be enabled at the 
same time"
+);
+
+#[cfg(feature = "snmalloc")]
+#[global_allocator]
+static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
+
+#[derive(Debug, Parser)]
+#[command(ignore_errors = true)]
+struct EnvParser {
+    #[command(flatten)]
+    options: CommonOpt,
+
+    #[arg(
+        env = "BENCH_PERSIST_RESULTS",
+        long = "persist_results",
+        default_value = "false",
+        action = clap::ArgAction::SetTrue
+    )]
+    persist_results: bool,
+
+    #[arg(
+        env = "BENCH_VALIDATE",
+        long = "validate_results",
+        default_value = "false",
+        action = clap::ArgAction::SetTrue
+    )]
+    validate: bool,
+
+    #[arg(env = "BENCH_NAME")]
+    name: Option<String>,
+
+    #[arg(env = "BENCH_SUBGROUP")]
+    subgroup: Option<String>,
+
+    #[arg(env = "BENCH_QUERY")]
+    query: Option<i32>,
+}
+
+#[cfg(feature = "mimalloc")]
+#[global_allocator]
+static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

Review Comment:
   Let's move this constant next to the snmalloc one, i.e. just before the 
`EnvParser` struct



##########
benchmarks/benches/sql.rs:
##########
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Criterion benchmark harness for SQL benchmark files under `sql_benchmarks`.
+//!
+//! SQL benchmarks describe setup, queries, result validation, and cleanup in
+//! `.benchmark` files. Run them with `benchmarks/bench.sh` or directly with
+//! Cargo, for example: `BENCH_NAME=tpch cargo bench --bench sql`.
+
+use clap::Parser;
+use criterion::{Criterion, SamplingMode, criterion_group, criterion_main};
+use datafusion::error::Result;
+use datafusion::prelude::SessionContext;
+use datafusion_benchmarks::sql_benchmark::SqlBenchmark;
+use datafusion_benchmarks::util::{CommonOpt, print_memory_stats};
+use datafusion_common::instant::Instant;
+use log::{debug, info};
+use std::collections::BTreeMap;
+use std::fs;
+use tokio::runtime::Runtime;
+
+static SQL_BENCHMARK_DIRECTORY: &str = "sql_benchmarks";

Review Comment:
   This assumes that the benchmark test is executed from the `benchmarks` 
folder.
   It will fail if the current working directory is the workspace, i.e. `cargo 
bench -p datafusion-benchmarks --bench sql`
   
   
   ```suggestion
   static SQL_BENCHMARK_DIRECTORY: LazyLock<String> = LazyLock::new(||
     format!("{}{}{}", env!("CARGO_MANIFEST_DIR"), std::path::MAIN_SEPARATOR, 
"sql_benchmarks")
   );
   ```
   
   and later use it with `&*SQL_BENCHMARK_DIRECTORY`



##########
benchmarks/sql_benchmarks/tpch/init/load_csv.sql:
##########
@@ -0,0 +1,99 @@
+CREATE EXTERNAL TABLE nation
+(
+    n_nationkey INT,
+    n_name      CHAR(25),
+    n_regionkey INT,
+    n_comment   VARCHAR(152),
+    PRIMARY KEY (n_nationkey)
+) STORED AS CSV LOCATION 
'data/tpch_sf${BENCH_SIZE:-1}/csv/nation/nation.1.csv';
+
+CREATE EXTERNAL TABLE region
+(
+    r_regionkey INT,
+    r_name      CHAR(25),
+    r_comment   VARCHAR(152),
+    PRIMARY KEY (r_regionkey)
+) STORED AS CSV LOCATION 
'data/tpch_sf${BENCH_SIZE:-1}/csv/region/region.1.csv';
+
+CREATE EXTERNAL TABLE supplier
+(
+    s_suppkey   INT,
+    s_name      CHAR(25),
+    s_address   VARCHAR(40),
+    s_nationkey INT,
+    s_phone     CHAR(15),
+    s_acctbal   DECIMAL(15, 2),
+    s_comment   VARCHAR(101),
+    PRIMARY KEY (s_suppkey)
+) STORED AS CSV LOCATION 
'data/tpch_sf${BENCH_SIZE:-1}/csv/supplier/supplier.1.csv';
+
+CREATE EXTERNAL TABLE customer
+(
+    c_custkey    INT,
+    c_name       VARCHAR(25),
+    c_address    VARCHAR(40),
+    c_nationkey  INT,
+    c_phone      CHAR(15),
+    c_acctbal    DECIMAL(15, 2),
+    c_mktsegment CHAR(10),
+    c_comment    VARCHAR(117),
+    PRIMARY KEY (c_custkey)
+) STORED AS CSV LOCATION 
'data/tpch_sf${BENCH_SIZE:-1}/csv/customer/customer.1.csv';
+
+CREATE EXTERNAL TABLE part
+(
+    p_partkey     INT,
+    p_name        VARCHAR(55),
+    p_mfgr        CHAR(25),
+    p_brand       CHAR(10),
+    p_type        VARCHAR(25),
+    p_size        INT,
+    p_container   CHAR(10),
+    p_retailprice DECIMAL(15, 2),
+    p_comment     VARCHAR(23),
+    PRIMARY KEY (p_partkey)
+) STORED AS CSV LOCATION 'data/tpch_sf${BENCH_SIZE:-1}/csv/part/part.1.csv';
+
+CREATE EXTERNAL TABLE partsupp
+(
+    ps_partkey    INT,
+    ps_suppkey    INT,
+    ps_availqty   INT,
+    ps_supplycost DECIMAL(15, 2),
+    ps_comment    VARCHAR(199),
+    PRIMARY KEY (ps_partkey)
+) STORED AS CSV LOCATION 
'data/tpch_sf${BENCH_SIZE:-1}/csv/partsupp/partsupp.1.csv';
+
+CREATE EXTERNAL TABLE orders
+(
+    o_orderkey      INT,
+    o_custkey       INT,
+    o_orderstatus   CHAR(1),
+    o_totalprice    DECIMAL(15, 2),
+    o_orderdate     DATE,
+    o_orderpriority CHAR(15),
+    o_clerk         CHAR(15),
+    o_shippriority  INT,
+    o_comment       VARCHAR(79),
+    PRIMARY KEY (o_orderkey)
+) STORED AS CSV LOCATION 'data/tpch_sf${BENCH_SIZE:-1}/csv/order/orders.1.csv';

Review Comment:
   
https://github.com/apache/datafusion/pull/21707/changes#diff-9bfff15e2b30e81d998b0cf3939bf503f045acd6044a572fb9775fdca8f24794R13
 uses "order`s`" as a parent folder



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to