erenavsarogullari commented on code in PR #20955:
URL: https://github.com/apache/datafusion/pull/20955#discussion_r3178913522


##########
datafusion/spark/src/function/array/sequence.rs:
##########
@@ -0,0 +1,325 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::function::functions_nested_utils::make_scalar_function;
+use arrow::array::{Array, Int64Builder};
+use arrow::datatypes::{DataType, Field, FieldRef, IntervalMonthDayNano};
+use datafusion_common::cast::as_int64_array;
+use datafusion_common::internal_err;
+use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err};
+use datafusion_expr::{
+    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
+};
+use datafusion_functions_nested::range::Range;
+use std::sync::Arc;
+
+/// Spark-compatible `sequence` expression.
+/// <https://spark.apache.org/docs/latest/api/sql/index.html#sequence>
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub struct SparkSequence {
+    signature: Signature,
+}
+
+impl Default for SparkSequence {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SparkSequence {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::user_defined(Volatility::Immutable),
+        }
+    }
+}
+
+impl ScalarUDFImpl for SparkSequence {
+    fn name(&self) -> &str {
+        "sequence"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        internal_err!("return_field_from_args should be used instead")
+    }
+
+    fn return_field_from_args(
+        &self,
+        args: datafusion_expr::ReturnFieldArgs,
+    ) -> Result<FieldRef> {
+        let return_type = if args.arg_fields[0].data_type().is_null()
+            || args.arg_fields[1].data_type().is_null()
+        {
+            DataType::Null
+        } else {
+            DataType::List(Arc::new(Field::new_list_field(
+                args.arg_fields[0].data_type().clone(),
+                true,
+            )))
+        };
+
+        Ok(Arc::new(Field::new(
+            "this_field_name_is_irrelevant",
+            return_type,
+            true,
+        )))
+    }
+
+    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
+        match arg_types.len() {
+            2 => {
+                let first_data_type =
+                    check_type(arg_types[0].clone(), 
"first".to_string().as_str())?;
+                let second_data_type =
+                    check_type(arg_types[1].clone(), 
"second".to_string().as_str())?;
+
+                if !first_data_type.is_null()
+                    && !second_data_type.is_null()
+                    && (first_data_type != second_data_type)
+                {
+                    return exec_err!(
+                        "first({first_data_type}) and 
second({second_data_type}) input types should be same"
+                    );
+                }
+
+                Ok(vec![first_data_type, second_data_type])
+            }
+            3 => {
+                let first_data_type =
+                    check_type(arg_types[0].clone(), 
"first".to_string().as_str())?;
+                let second_data_type =
+                    check_type(arg_types[1].clone(), 
"second".to_string().as_str())?;
+                let third_data_type = check_interval_type(
+                    arg_types[2].clone(),
+                    "third".to_string().as_str(),
+                )?;
+
+                if !first_data_type.is_null() && !second_data_type.is_null() {
+                    if first_data_type != second_data_type {
+                        return exec_err!(
+                            "first({first_data_type}) and 
second({second_data_type}) input types should be same"
+                        );
+                    }
+
+                    if !check_interval_type_by_first_type(
+                        &first_data_type,
+                        &third_data_type,
+                    ) {
+                        return exec_err!(
+                            "interval type should be integer for integer input 
or time based"
+                        );
+                    }
+                }
+
+                Ok(vec![first_data_type, second_data_type, third_data_type])
+            }
+            _ => {
+                exec_err!("num of input parameters should be 2 or 3")
+            }
+        }
+    }
+
+    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
+        let args = &args.args;
+
+        if args.iter().any(|arg| arg.data_type().is_null()) {
+            return Ok(ColumnarValue::Scalar(ScalarValue::Null));
+        }
+
+        match args[0].data_type() {
+            DataType::Int64 => {
+                validate_int64_sequence_step(args)?;
+                let optional_new_args = add_step_argument_if_not_exists(args)?;
+                let new_args = match optional_new_args {
+                    Some(new_args) => &new_args.to_owned(),
+                    None => args,
+                };
+                make_scalar_function(|args| {
+                    Range::generate_series().gen_range_inner(args)
+                })(new_args)
+            }
+            DataType::Date32 | DataType::Date64 => {
+                let optional_new_args = 
add_interval_argument_if_not_exists(args);
+                let new_args = match optional_new_args {
+                    Some(new_args) => &new_args.to_owned(),
+                    None => args,
+                };
+                make_scalar_function(|args| 
Range::generate_series().gen_range_date(args))(
+                    new_args,
+                )
+            }
+            DataType::Timestamp(_, _) => {
+                let optional_new_args = 
add_interval_argument_if_not_exists(args);
+                let new_args = match optional_new_args {
+                    Some(new_args) => &new_args.to_owned(),
+                    None => args,
+                };
+                make_scalar_function(|args| {
+                    Range::generate_series().gen_range_timestamp(args)
+                })(new_args)
+            }
+            dt => {
+                internal_err!(
+                    "Signature failed to guard unknown input type for {}: 
{dt}",
+                    self.name()
+                )
+            }
+        }
+    }
+}
+
+/// Validates explicit `step` for 3-argument integer `sequence` (Spark 
semantics).
+fn validate_int64_sequence_step(args: &[ColumnarValue]) -> Result<()> {
+    if args.len() != 3 {
+        return Ok(());
+    }
+    let arrays = ColumnarValue::values_to_arrays(args)?;
+    let start = as_int64_array(&arrays[0])?;
+    let stop = as_int64_array(&arrays[1])?;
+    let step = as_int64_array(&arrays[2])?;
+    for i in 0..start.len() {
+        if start.is_null(i) || stop.is_null(i) || step.is_null(i) {
+            continue;
+        }
+        let s = start.value(i);
+        let e = stop.value(i);
+        let st = step.value(i);
+        if st == 0 {
+            return exec_err!("Step cannot be 0 for sequence");

Review Comment:
   Firstly, thanks for the review.
   Yes, `step` can be 0 when `start` and `stop` are equal. It has been fixed by 
the `5th` commit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to