parthchandra commented on code in PR #3643:
URL: https://github.com/apache/datafusion-comet/pull/3643#discussion_r3083286632


##########
native/spark-expr/src/array_funcs/arrays_zip.rs:
##########
@@ -0,0 +1,341 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::RecordBatch;
+use arrow::array::{
+    new_null_array, Array, ArrayRef, Capacities, ListArray, MutableArrayData, 
StructArray,
+};
+use arrow::buffer::{NullBuffer, OffsetBuffer};
+use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null};
+use arrow::datatypes::Schema;
+use arrow::datatypes::{DataType, Field, Fields};
+use datafusion::common::cast::{as_fixed_size_list_array, as_large_list_array, 
as_list_array};
+use datafusion::common::{exec_err, Result, ScalarValue};
+use datafusion::logical_expr::ColumnarValue;
+use datafusion::physical_expr::PhysicalExpr;
+use std::any::Any;
+use std::fmt::{Display, Formatter};
+use std::sync::Arc;
+// TODO: Reuse functions from DF
+// use datafusion::functions_nested::utils::make_scalar_function;
+// use datafusion::functions_nested::arrays_zip::arrays_zip_inner;
+
+#[derive(Debug, Eq, Hash, PartialEq)]
+pub struct SparkArraysZipFunc {
+    values: Vec<Arc<dyn PhysicalExpr>>,
+    names: Vec<String>,
+}
+
+impl SparkArraysZipFunc {
+    pub fn new(values: Vec<Arc<dyn PhysicalExpr>>, names: Vec<String>) -> Self 
{
+        Self { values, names }
+    }
+    fn fields(&self, schema: &Schema) -> Result<Vec<Field>> {
+        let mut fields: Vec<Field> = Vec::with_capacity(self.values.len());
+        for (i, v) in self.values.iter().enumerate() {
+            let element_type = match (*v).as_ref().data_type(schema)? {
+                List(field) | LargeList(field) | FixedSizeList(field, _) => {
+                    field.data_type().clone()
+                }
+                Null => Null,
+                dt => {
+                    return exec_err!("arrays_zip expects array arguments, got 
{dt}");
+                }
+            };
+            fields.push(Field::new(self.names[i].to_string(), element_type, 
true));
+        }
+
+        Ok(fields)
+    }
+}
+
+impl Display for SparkArraysZipFunc {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "ArraysZip [values: {:?}, names: {:?}]",
+            self.values, self.names
+        )
+    }
+}
+
+impl PhysicalExpr for SparkArraysZipFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
+        let fields = self.fields(input_schema)?;
+        Ok(List(Arc::new(Field::new_list_field(
+            DataType::Struct(Fields::from(fields)),
+            true,

Review Comment:
   There is a slight mismatch here. Spark has this defined as non-nullable. 



##########
spark/src/main/scala/org/apache/comet/serde/arrays.scala:
##########
@@ -623,6 +624,57 @@ object CometSize extends CometExpressionSerde[Size] {
 
 }
 
+object CometArraysZip extends CometExpressionSerde[ArraysZip] {
+  override def getSupportLevel(expr: ArraysZip): SupportLevel = {
+    expr.dataType match {
+      case _: ArrayType => Compatible()

Review Comment:
   We should probably check the element type here. There have been issues noted 
in the past. See this for instance - 
https://github.com/apache/datafusion-comet/pull/1308



##########
spark/src/main/scala/org/apache/comet/serde/arrays.scala:
##########
@@ -623,6 +624,57 @@ object CometSize extends CometExpressionSerde[Size] {
 
 }
 
+object CometArraysZip extends CometExpressionSerde[ArraysZip] {
+  override def getSupportLevel(expr: ArraysZip): SupportLevel = {
+    expr.dataType match {
+      case _: ArrayType => Compatible()
+      case other =>
+        // this should be unreachable because Spark only supports array inputs
+        Unsupported(Some(s"Unsupported child data type: $other"))
+    }
+  }
+
+  override def convert(
+      expr: ArraysZip,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+
+    val exprChildren: Seq[Option[ExprOuterClass.Expr]] =
+      expr.children.map(exprToProtoInternal(_, inputs, binding))
+    val names: Seq[Any] = expr.names.map(_.eval(EmptyRow))
+
+    // mimic Spark's ArraysZip behavior: returns NULL if any argument is NULL
+    val combinedNullCheck = expr.children.map(child => 
IsNotNull(child)).reduce(And)
+    val isNotNullExpr = exprToProtoInternal(combinedNullCheck, inputs, binding)
+    val nullLiteralProto = exprToProto(Literal(null, BooleanType), Seq.empty)

Review Comment:
   The null literal here uses BooleanType, but elsewhere in this file (e.g., 
CometArrayAppend at line 88) we use the return type of the expression. DF 
expects all arms of casewhen to have compatible types and this may cause an 
error.



##########
spark/src/test/resources/sql-tests/expressions/array/arrays_zip.sql:
##########
@@ -0,0 +1,147 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ConfigMatrix: parquet.enable.dictionary=false,true
+
+-- Basic usage with arrays of same length
+query
+SELECT arrays_zip(array(1, 2, 3), array(2, 3, 4));
+
+-- Arrays with different lengths
+query
+SELECT arrays_zip(array(1, 2, 3), array('a', 'b'));
+
+-- With null values
+query
+SELECT arrays_zip(array(1, null, 3), array('x', 'y', 'z'));
+
+-- basic: two integer arrays of equal length
+query
+select arrays_zip(array(1, 2, 3), array(10, 20, 30));
+
+-- basic: two arrays with different element types (int + string)
+query
+select arrays_zip(array(1, 2, 3), array('a', 'b', 'c'));
+
+-- three arrays of equal length
+query
+SELECT arrays_zip(array(1, 2), array(2, 3), array(3, 4));
+
+-- three arrays of equal length
+query
+select arrays_zip(array(1, 2, 3), array(10, 20, 30), array(100, 200, 300));
+
+-- four arrays of equal length
+query
+select arrays_zip(array(1), array(2), array(3), array(4));
+
+-- mixed element types: float + boolean
+query
+select arrays_zip(array(1.5, 2.5), array(true, false));
+
+-- different length arrays: shorter array padded with NULLs
+query
+select arrays_zip(array(1, 2), array(3, 4, 5));
+
+-- different length arrays: first longer
+query
+select arrays_zip(array(1, 2, 3), array(10));
+
+-- different length: one single element, other three elements
+query
+select arrays_zip(array(1), array('a', 'b', 'c'));
+
+-- empty arrays
+query
+select arrays_zip(array(), array());
+
+-- one empty, one non-empty
+query
+select arrays_zip(array(), array(1, 2, 3));
+
+-- NULL elements inside arrays
+query
+select arrays_zip(array(1, null, 3), array('a', 'b', 'c'));
+
+-- all NULL elements
+query
+select arrays_zip(array(cast(NULL AS int), NULL, NULL), array(cast(NULL AS 
string), NULL, NULL));
+
+-- both args are NULL (entire list null)
+query
+select arrays_zip(cast(NULL AS array<int>), cast(NULL AS array<int>));
+
+-- single element arrays
+query
+select arrays_zip(array(42), array('hello'));
+
+-- single argument
+query
+SELECT arrays_zip(null)
+
+query
+select arrays_zip(cast(NULL AS array<int>));
+
+query
+select arrays_zip(array());
+
+query
+select arrays_zip(array(1, 2, 3));
+
+-- one arg is NULL list, other is real array
+query
+select arrays_zip(cast(NULL AS array<int>), array(1, 2, 3));
+
+-- real array + NULL list
+query
+select arrays_zip(array(1, 2), cast(NULL AS array<int>));
+
+-- w/ names
+statement
+CREATE TABLE test_arrays_zip(a array<int>, b array<int>) USING parquet
+
+statement
+-- column-level test with multiple rows
+INSERT INTO test_arrays_zip VALUES (array(1, 2), array(10, 20)), (array(3, 4, 
5), array(30)), (array(6), array(60, 70))
+
+-- column-level test with NULL rows
+INSERT INTO test_arrays_zip VALUES (array(1, 2), array(10, 20)), (cast(NULL AS 
array<int>), array(30, 40)), (array(5, 6), cast(NULL AS array<int>))
+
+INSERT INTO test_arrays_zip VALUES (array(1), array(10, 20)), (array(2, 3), 
array(30))
+
+query
+select arrays_zip(a, b) FROM test_arrays_zip
+
+-- single argument
+query
+select arrays_zip(a) FROM test_arrays_zip
+
+query
+select arrays_zip(b) FROM test_arrays_zip
+
+-- real array + NULL list
+query
+SELECT arrays_zip(a, b) FROM (SELECT array(1, 2, 3) as a, null as b)
+
+query
+SELECT arrays_zip(b, a) FROM (SELECT array(1, 2, 3) as a, null as b)
+
+query
+SELECT arrays_zip(a) FROM (SELECT array(1, 2, 3) as a, null as b)
+
+query
+SELECT arrays_zip(b) FROM (SELECT array(1, 2, 3) as a, null as b)

Review Comment:
   nit: newline needed at end of file



##########
native/core/src/execution/planner.rs:
##########
@@ -682,6 +682,20 @@ impl PhysicalPlanner {
                     csv_write_options,
                 )))
             }
+            ExprStruct::ArraysZip(expr) => {
+                assert!(!expr.values.is_empty());

Review Comment:
   Better to return Err instead of asserting (which will cause a panic).
   ```
   return Err(GeneralError("arrays_zip requires at least one 
argument".to_string()))
   ```
   If you want to be extra safe, then you can also check 
   ```
   expr.values.len() == expr.names.len()
   ```



##########
native/proto/src/proto/expr.proto:
##########
@@ -489,3 +490,10 @@ message ArrayJoin {
 message Rand {
   int64 seed = 1;
 }
+
+// Spark's ArraysZip takes children: Seq[Expression] and names: Seq[Expression]
+// 
https://github.com/apache/spark/blob/branch-4.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L296
+message ArraysZip {
+  repeated Expr values = 1;
+  repeated string names = 2;
+}

Review Comment:
   nit: newline needed at end of file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to