This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new f27c4c308 chore: Refactor planner random and partition expressions 
(#3704)
f27c4c308 is described below

commit f27c4c308c522be4f4749d1e60f11547d61929f3
Author: Bhargava Vadlamani <[email protected]>
AuthorDate: Tue Mar 17 11:56:35 2026 -0700

    chore: Refactor planner random and partition expressions (#3704)
---
 native/core/src/execution/expressions/mod.rs       |  2 +
 native/core/src/execution/expressions/partition.rs | 57 ++++++++++++++++++++++
 native/core/src/execution/expressions/random.rs    | 56 +++++++++++++++++++++
 native/core/src/execution/planner.rs               | 24 +++------
 .../src/execution/planner/expression_registry.rs   | 30 ++++++++++++
 5 files changed, 152 insertions(+), 17 deletions(-)

diff --git a/native/core/src/execution/expressions/mod.rs 
b/native/core/src/execution/expressions/mod.rs
index 563d62e91..c2b144b7d 100644
--- a/native/core/src/execution/expressions/mod.rs
+++ b/native/core/src/execution/expressions/mod.rs
@@ -22,6 +22,8 @@ pub mod bitwise;
 pub mod comparison;
 pub mod logical;
 pub mod nullcheck;
+pub mod partition;
+pub mod random;
 pub mod strings;
 pub mod subquery;
 pub mod temporal;
diff --git a/native/core/src/execution/expressions/partition.rs 
b/native/core/src/execution/expressions/partition.rs
new file mode 100644
index 000000000..4b0287f8c
--- /dev/null
+++ b/native/core/src/execution/expressions/partition.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::operators::ExecutionError;
+use crate::execution::planner::expression_registry::ExpressionBuilder;
+use crate::execution::planner::PhysicalPlanner;
+use arrow::datatypes::SchemaRef;
+use datafusion::common::ScalarValue;
+use datafusion::physical_expr::expressions::Literal;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion_comet_proto::spark_expression::Expr;
+use 
datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncreasingId;
+use std::sync::Arc;
+
+pub struct SparkPartitionIdBuilder;
+
+impl ExpressionBuilder for SparkPartitionIdBuilder {
+    fn build(
+        &self,
+        _spark_expr: &Expr,
+        _input_schema: SchemaRef,
+        planner: &PhysicalPlanner,
+    ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+        Ok(Arc::new(Literal::new(ScalarValue::Int32(Some(
+            planner.partition(),
+        )))))
+    }
+}
+
+pub struct MonotonicallyIncreasingIdBuilder;
+
+impl ExpressionBuilder for MonotonicallyIncreasingIdBuilder {
+    fn build(
+        &self,
+        _spark_expr: &Expr,
+        _input_schema: SchemaRef,
+        planner: &PhysicalPlanner,
+    ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+        Ok(Arc::new(MonotonicallyIncreasingId::from_partition_id(
+            planner.partition(),
+        )))
+    }
+}
diff --git a/native/core/src/execution/expressions/random.rs 
b/native/core/src/execution/expressions/random.rs
new file mode 100644
index 000000000..5ea6092cb
--- /dev/null
+++ b/native/core/src/execution/expressions/random.rs
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::execution::operators::ExecutionError;
+use crate::execution::planner::expression_registry::ExpressionBuilder;
+use crate::execution::planner::PhysicalPlanner;
+use crate::extract_expr;
+use arrow::datatypes::SchemaRef;
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion_comet_proto::spark_expression::Expr;
+use datafusion_comet_spark_expr::{RandExpr, RandnExpr};
+use std::sync::Arc;
+
+pub struct RandBuilder;
+
+impl ExpressionBuilder for RandBuilder {
+    fn build(
+        &self,
+        spark_expr: &Expr,
+        _input_schema: SchemaRef,
+        planner: &PhysicalPlanner,
+    ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+        let expr = extract_expr!(spark_expr, Rand);
+        let seed = expr.seed.wrapping_add(planner.partition().into());
+        Ok(Arc::new(RandExpr::new(seed)))
+    }
+}
+
+pub struct RandnBuilder;
+
+impl ExpressionBuilder for RandnBuilder {
+    fn build(
+        &self,
+        spark_expr: &Expr,
+        _input_schema: SchemaRef,
+        planner: &PhysicalPlanner,
+    ) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
+        let expr = extract_expr!(spark_expr, Randn);
+        let seed = expr.seed.wrapping_add(planner.partition().into());
+        Ok(Arc::new(RandnExpr::new(seed)))
+    }
+}
diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 15bbabe88..bd3775592 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -123,12 +123,11 @@ use datafusion_comet_proto::{
     },
     spark_partitioning::{partitioning::PartitioningStruct, Partitioning as 
SparkPartitioning},
 };
-use 
datafusion_comet_spark_expr::monotonically_increasing_id::MonotonicallyIncreasingId;
 use datafusion_comet_spark_expr::{
     ArrayInsert, Avg, AvgDecimal, Cast, CheckOverflow, Correlation, 
Covariance, CreateNamedStruct,
     DecimalRescaleCheckOverflow, GetArrayStructFields, GetStructField, IfExpr, 
ListExtract,
-    NormalizeNaNAndZero, RandExpr, RandnExpr, SparkCastOptions, Stddev, 
SumDecimal, ToJson,
-    UnboundColumn, Variance, WideDecimalBinaryExpr, WideDecimalOp,
+    NormalizeNaNAndZero, SparkCastOptions, Stddev, SumDecimal, ToJson, 
UnboundColumn, Variance,
+    WideDecimalBinaryExpr, WideDecimalOp,
 };
 use itertools::Itertools;
 use jni::objects::GlobalRef;
@@ -197,6 +196,11 @@ impl PhysicalPlanner {
         &self.session_ctx
     }
 
+    /// Return partition id of this planner.
+    pub fn partition(&self) -> i32 {
+        self.partition
+    }
+
     /// get DataFusion PartitionedFiles from a Spark FilePartition
     fn get_partitioned_files(
         &self,
@@ -655,20 +659,6 @@ impl PhysicalPlanner {
                     expr.legacy_negative_index,
                 )))
             }
-            ExprStruct::Rand(expr) => {
-                let seed = expr.seed.wrapping_add(self.partition.into());
-                Ok(Arc::new(RandExpr::new(seed)))
-            }
-            ExprStruct::Randn(expr) => {
-                let seed = expr.seed.wrapping_add(self.partition.into());
-                Ok(Arc::new(RandnExpr::new(seed)))
-            }
-            ExprStruct::SparkPartitionId(_) => 
Ok(Arc::new(DataFusionLiteral::new(
-                ScalarValue::Int32(Some(self.partition)),
-            ))),
-            ExprStruct::MonotonicallyIncreasingId(_) => Ok(Arc::new(
-                MonotonicallyIncreasingId::from_partition_id(self.partition),
-            )),
             ExprStruct::ToCsv(expr) => {
                 let csv_struct_expr =
                     self.create_expr(expr.child.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
diff --git a/native/core/src/execution/planner/expression_registry.rs 
b/native/core/src/execution/planner/expression_registry.rs
index 34aa3de17..bf3904d9c 100644
--- a/native/core/src/execution/planner/expression_registry.rs
+++ b/native/core/src/execution/planner/expression_registry.rs
@@ -184,6 +184,12 @@ impl ExpressionRegistry {
 
         // Register temporal expressions
         self.register_temporal_expressions();
+
+        // Register random expressions
+        self.register_random_expressions();
+
+        // Register partition expressions
+        self.register_partition_expressions();
     }
 
     /// Register arithmetic expression builders
@@ -386,4 +392,28 @@ impl ExpressionRegistry {
             )),
         }
     }
+
+    /// Register random expression builders
+    fn register_random_expressions(&mut self) {
+        use crate::execution::expressions::random::*;
+
+        self.builders
+            .insert(ExpressionType::Rand, Box::new(RandBuilder));
+        self.builders
+            .insert(ExpressionType::Randn, Box::new(RandnBuilder));
+    }
+
+    /// Register partition expression builders
+    fn register_partition_expressions(&mut self) {
+        use crate::execution::expressions::partition::*;
+
+        self.builders.insert(
+            ExpressionType::SparkPartitionId,
+            Box::new(SparkPartitionIdBuilder),
+        );
+        self.builders.insert(
+            ExpressionType::MonotonicallyIncreasingId,
+            Box::new(MonotonicallyIncreasingIdBuilder),
+        );
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to