Fokko commented on code in PR #42:
URL: https://github.com/apache/iceberg-rust/pull/42#discussion_r1310188384


##########
crates/iceberg/src/transform/mod.rs:
##########
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Transform function used to compute partition values.
+use crate::{spec::Transform, Result};
+use arrow::array::ArrayRef;
+
+mod identity;
+mod temporal;
+mod void;
+
+/// TransformFunction is a trait that defines the interface for all transform 
functions.
+pub trait TransformFunction: Send {
+    /// transform will take an input array and transform it into a new array.
+    /// The implementation of this function will need to check and downcast 
the input to specific
+    /// type.
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef>;
+}
+
+/// BoxedTransformFunction is a boxed trait object of TransformFunction.
+pub type BoxedTransformFunction = Box<dyn TransformFunction>;
+
+/// create_transform_function creates a boxed trait object of 
TransformFunction from a Transform.
+pub fn create_transform_function(transform: &Transform) -> 
Result<BoxedTransformFunction> {
+    match transform {
+        Transform::Identity => Ok(Box::new(identity::Identity {})),
+        Transform::Void => Ok(Box::new(void::Void {})),
+        Transform::Year => Ok(Box::new(temporal::Year {})),
+        Transform::Month => Ok(Box::new(temporal::Month {})),
+        Transform::Day => Ok(Box::new(temporal::Day {})),
+        Transform::Hour => Ok(Box::new(temporal::Hour {})),
+        _ => Err(crate::error::Error::new(
+            crate::ErrorKind::FeatureUnsupported,
+            format!("Transform {:?} is not implemented", transform),
+        )),
+    }

Review Comment:
   `Truncate` and `Bucket` are missing



##########
crates/iceberg/Cargo.toml:
##########
@@ -28,6 +28,7 @@ keywords = ["iceberg"]
 
 [dependencies]
 apache-avro = "0.15.0"
+arrow = { version = ">=46" }

Review Comment:
   In PyIceberg we make Arrow an optional dependency because it is so big, idk 
if the rust implementation has the same problem.



##########
crates/iceberg/src/transform/temporal.rs:
##########
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::TransformFunction;
+use crate::{Error, ErrorKind, Result};
+use arrow::array::{Array, TimestampMicrosecondArray};
+use arrow::compute::binary;
+use arrow::datatypes;
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{ArrayRef, Date32Array, Int32Array},
+    compute::{month_dyn, year_dyn},
+};
+use chrono::Datelike;
+use std::sync::Arc;
+
+/// The number of days since unix epoch.
+const DAY_SINCE_UNIX_EPOCH: i32 = 719163;
+/// Hour in one second.
+const HOUR_PER_SECOND: f64 = 1.0_f64 / 3600.0_f64;
+/// Day in one second.
+const DAY_PER_SECOND: f64 = 1.0_f64 / 24.0_f64 / 3600.0_f64;
+/// Year of unix epoch.
+const UNIX_EPOCH_YEAR: i32 = 1970;
+
+/// Extract a date or timestamp year, as years from 1970
+pub struct Year;
+
+impl TransformFunction for Year {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        let array =
+            year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, 
format!("{err}")))?;
+        Ok(Arc::<Int32Array>::new(
+            array
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .unary(|v| v - UNIX_EPOCH_YEAR),
+        ))
+    }
+}
+
+/// Extract a date or timestamp month, as months from 1970-01-01
+pub struct Month;
+
+impl TransformFunction for Month {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        let year_array =
+            year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, 
format!("{err}")))?;
+        let year_array: Int32Array = year_array
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap()
+            .unary(|v| 12 * (v - UNIX_EPOCH_YEAR));
+        let month_array =
+            month_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, 
format!("{err}")))?;
+        Ok(Arc::<Int32Array>::new(
+            binary(
+                month_array.as_any().downcast_ref::<Int32Array>().unwrap(),
+                year_array.as_any().downcast_ref::<Int32Array>().unwrap(),
+                // Compute month from 1970-01-01, so minus 1 here.
+                |a, b| a + b - 1,
+            )
+            .unwrap(),
+        ))
+    }
+}
+
+/// Extract a date or timestamp day, as days from 1970-01-01
+pub struct Day;
+
+impl TransformFunction for Day {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        let res: Int32Array = match input.data_type() {
+            DataType::Timestamp(datatypes::TimeUnit::Microsecond, _) => input
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap()
+                .unary(|v| -> i32 { (v as f64 / 1000.0 / 1000.0 * 
DAY_PER_SECOND) as i32 }),
+            DataType::Date32 => {
+                input
+                    .as_any()
+                    .downcast_ref::<Date32Array>()
+                    .unwrap()
+                    .unary(|v| -> i32 {
+                        
datatypes::Date32Type::to_naive_date(v).num_days_from_ce()
+                            - DAY_SINCE_UNIX_EPOCH
+                    })
+            }
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    format!(
+                        "Should not call internally for unsupport data type 
{:?}",
+                        input.data_type()
+                    ),
+                ))
+            }
+        };
+        Ok(Arc::new(res))
+    }
+}
+
+/// Extract a timestamp hour, as hours from 1970-01-01 00:00:00
+pub struct Hour;
+
+impl TransformFunction for Hour {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        let res: Int32Array = match input.data_type() {
+            DataType::Timestamp(datatypes::TimeUnit::Microsecond, _) => input
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap()
+                .unary(|v| -> i32 { (v as f64 * HOUR_PER_SECOND / 1000.0 / 
1000.0) as i32 }),
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    format!(
+                        "Should not call internally for unsupport data type 
{:?}",
+                        input.data_type()
+                    ),
+                ))
+            }
+        };
+        Ok(Arc::new(res))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use arrow::array::{ArrayRef, Date32Array, Int32Array, 
TimestampMicrosecondArray};
+    use chrono::NaiveDate;
+    use std::sync::Arc;
+
+    use crate::transform::TransformFunction;
+
+    #[test]
+    fn test_transform_years() {
+        let year = super::Year;
+        let ori_date = vec![
+            NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),

Review Comment:
   Can we also add `timestamp`, `timestamptz`?



##########
crates/iceberg/src/transform/temporal.rs:
##########
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::TransformFunction;
+use crate::{Error, ErrorKind, Result};
+use arrow::array::{Array, TimestampMicrosecondArray};
+use arrow::compute::binary;
+use arrow::datatypes;
+use arrow::datatypes::DataType;
+use arrow::{
+    array::{ArrayRef, Date32Array, Int32Array},
+    compute::{month_dyn, year_dyn},
+};
+use chrono::Datelike;
+use std::sync::Arc;
+
+/// The number of days since unix epoch.
+const DAY_SINCE_UNIX_EPOCH: i32 = 719163;
+/// Hour in one second.
+const HOUR_PER_SECOND: f64 = 1.0_f64 / 3600.0_f64;
+/// Day in one second.
+const DAY_PER_SECOND: f64 = 1.0_f64 / 24.0_f64 / 3600.0_f64;
+/// Year of unix epoch.
+const UNIX_EPOCH_YEAR: i32 = 1970;
+
+/// Extract a date or timestamp year, as years from 1970
+pub struct Year;
+
+impl TransformFunction for Year {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        let array =
+            year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, 
format!("{err}")))?;
+        Ok(Arc::<Int32Array>::new(
+            array
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .unary(|v| v - UNIX_EPOCH_YEAR),
+        ))
+    }
+}
+
+/// Extract a date or timestamp month, as months from 1970-01-01
+pub struct Month;
+
+impl TransformFunction for Month {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        let year_array =
+            year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, 
format!("{err}")))?;
+        let year_array: Int32Array = year_array
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap()
+            .unary(|v| 12 * (v - UNIX_EPOCH_YEAR));
+        let month_array =
+            month_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, 
format!("{err}")))?;
+        Ok(Arc::<Int32Array>::new(
+            binary(
+                month_array.as_any().downcast_ref::<Int32Array>().unwrap(),
+                year_array.as_any().downcast_ref::<Int32Array>().unwrap(),
+                // Compute month from 1970-01-01, so minus 1 here.
+                |a, b| a + b - 1,
+            )
+            .unwrap(),
+        ))
+    }
+}
+
+/// Extract a date or timestamp day, as days from 1970-01-01
+pub struct Day;
+
+impl TransformFunction for Day {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        let res: Int32Array = match input.data_type() {
+            DataType::Timestamp(datatypes::TimeUnit::Microsecond, _) => input
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap()
+                .unary(|v| -> i32 { (v as f64 / 1000.0 / 1000.0 * 
DAY_PER_SECOND) as i32 }),
+            DataType::Date32 => {
+                input
+                    .as_any()
+                    .downcast_ref::<Date32Array>()
+                    .unwrap()
+                    .unary(|v| -> i32 {
+                        
datatypes::Date32Type::to_naive_date(v).num_days_from_ce()
+                            - DAY_SINCE_UNIX_EPOCH
+                    })
+            }
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    format!(
+                        "Should not call internally for unsupport data type 
{:?}",
+                        input.data_type()
+                    ),
+                ))
+            }
+        };
+        Ok(Arc::new(res))
+    }
+}
+
+/// Extract a timestamp hour, as hours from 1970-01-01 00:00:00
+pub struct Hour;
+
+impl TransformFunction for Hour {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        let res: Int32Array = match input.data_type() {
+            DataType::Timestamp(datatypes::TimeUnit::Microsecond, _) => input
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap()
+                .unary(|v| -> i32 { (v as f64 * HOUR_PER_SECOND / 1000.0 / 
1000.0) as i32 }),
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    format!(
+                        "Should not call internally for unsupport data type 
{:?}",
+                        input.data_type()
+                    ),
+                ))
+            }
+        };
+        Ok(Arc::new(res))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use arrow::array::{ArrayRef, Date32Array, Int32Array, 
TimestampMicrosecondArray};
+    use chrono::NaiveDate;
+    use std::sync::Arc;
+
+    use crate::transform::TransformFunction;
+
+    #[test]
+    fn test_transform_years() {

Review Comment:
   These tests are great, thanks!



##########
crates/iceberg/src/transform/mod.rs:
##########
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Transform function used to compute partition values.
+use crate::{spec::Transform, Result};
+use arrow::array::ArrayRef;
+
+mod identity;
+mod temporal;
+mod void;
+
+/// TransformFunction is a trait that defines the interface for all transform 
functions.
+pub trait TransformFunction: Send {
+    /// transform will take an input array and transform it into a new array.
+    /// The implementation of this function will need to check and downcast 
the input to specific
+    /// type.
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef>;

Review Comment:
   I'm not a rust'er, but is this passing in an array?
   
   Maybe some context on how this is implemented in Python (and the Java 
implementation also followed this path eventually). In PyIceberg we initialize 
a transform, but that one might be unbound. A common place to find a transform 
is in the partition spec: https://iceberg.apache.org/spec/#partition-specs
   Here we have a reference to a field by the `source-id`. This means that we 
don't know yet what kind the input is. For example, you can bucket a string, 
uuid, long, int, datetime, etc. Once we bind the transform to the schema, we 
return the actual function as a callable (we pass a function): 
   
   ```python
       def transform(self, source: IcebergType, bucket: bool = True) -> 
Callable[[Optional[Any]], Optional[int]]:
           source_type = type(source)
           if source_type in {IntegerType, LongType, DateType, TimeType, 
TimestampType, TimestamptzType}:
   
               def hash_func(v: Any) -> int:
                   return mmh3.hash(struct.pack("<q", v))
   
           elif source_type == DecimalType:
   
               def hash_func(v: Any) -> int:
                   return mmh3.hash(decimal_to_bytes(v))
   
           elif source_type in {StringType, FixedType, BinaryType}:
   
               def hash_func(v: Any) -> int:
                   return mmh3.hash(v)
   
           elif source_type == UUIDType:
   
               def hash_func(v: Any) -> int:
                   if isinstance(v, UUID):
                       return mmh3.hash(v.bytes)
                   return mmh3.hash(v)
   
           else:
               raise ValueError(f"Unknown type {source}")
   
           if bucket:
               return lambda v: (hash_func(v) & IntegerType.max) % 
self._num_buckets if v else None
           return hash_func
   ```
   This way we can initialize the transform, even when we don't know the type 
of the source type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to