rshkv commented on code in PR #863:
URL: https://github.com/apache/iceberg-rust/pull/863#discussion_r1901273157


##########
crates/iceberg/src/arrow/schema.rs:
##########
@@ -814,6 +814,193 @@ get_parquet_stat_as_datum!(min);
 
 get_parquet_stat_as_datum!(max);
 
+/// Utilities to deal with [arrow_array::builder] types in the Iceberg context.
+pub(crate) mod builder {
+    use arrow_array::builder::*;
+    use arrow_array::cast::AsArray;
+    use arrow_array::types::*;
+    use arrow_array::{ArrayRef, Datum as ArrowDatum};
+    use arrow_schema::{DataType, TimeUnit};
+    use ordered_float::OrderedFloat;
+
+    use crate::spec::{Literal, PrimitiveLiteral};
+    use crate::{Error, ErrorKind};
+
+    /// A helper wrapping [ArrayBuilder] for building arrays without declaring 
the inner type at
+    /// compile-time when types are determined dynamically (e.g. based on some 
column type).
+    /// A [DataType] is given at construction time which is used to later 
downcast the inner array
+    /// and provided values.
+    pub(crate) struct AnyArrayBuilder {
+        data_type: DataType,
+        inner: Box<dyn ArrayBuilder>,
+    }
+
+    impl AnyArrayBuilder {
+        pub(crate) fn new(data_type: &DataType) -> Self {
+            Self {
+                data_type: data_type.clone(),
+                inner: make_builder(data_type, 0),
+            }
+        }
+
+        pub(crate) fn finish(&mut self) -> ArrayRef {
+            self.inner.finish()
+        }
+
+        /// Append an [[arrow_array::Datum]] value.
+        pub(crate) fn append_datum(&mut self, value: &dyn ArrowDatum) -> 
crate::Result<()> {
+            let (array, is_scalar) = value.get();
+            assert!(is_scalar, "Can only append scalar datum");
+
+            match array.data_type() {

Review Comment:
   This is list is exhaustive based on the `ArrowSchemaVisitor::primitive` 
function above. I.e., every type  produced there is covered here. 



##########
crates/iceberg/src/scan.rs:
##########
@@ -1084,6 +1087,50 @@ pub mod tests {
                                 .record_count(1)
                                 
.partition(Struct::from_iter([Some(Literal::long(100))]))
                                 .key_metadata(None)
+                                // Note:
+                                // The bounds below need to agree with the 
test data written below
+                                // into the Parquet file. If not, tests that 
rely on filter scans
+                                // fail because of wrong bounds.
+                                .lower_bounds(HashMap::from([
+                                    (1, Datum::long(1)),
+                                    (2, Datum::long(2)),
+                                    (3, Datum::long(3)),
+                                    (4, Datum::string("Apache")),
+                                    (5, Datum::double(100)),
+                                    (6, Datum::int(100)),
+                                    (7, Datum::long(100)),
+                                    (8, Datum::bool(false)),
+                                    (9, Datum::float(100.0)),
+                                    // decimal values are not supported by 
schema::get_arrow_datum
+                                    // (10, Datum::decimal(Decimal(123, 2))),
+                                    (11, Datum::date(0)),
+                                    (12, Datum::timestamp_micros(0)),
+                                    (13, Datum::timestamptz_micros(0)),
+                                    // ns timestamps, uuid, fixed, binary are 
currently not
+                                    // supported in schema::get_arrow_datum
+                                ]))
+                                .upper_bounds(HashMap::from([
+                                    (1, Datum::long(1)),
+                                    (2, Datum::long(5)),
+                                    (3, Datum::long(4)),
+                                    (4, Datum::string("Iceberg")),
+                                    (5, Datum::double(200)),
+                                    (6, Datum::int(200)),
+                                    (7, Datum::long(200)),
+                                    (8, Datum::bool(true)),
+                                    (9, Datum::float(200.0)),
+                                    // decimal values are not supported by 
schema::get_arrow_datum
+                                    // (10, Datum::decimal(Decimal(123, 2))),
+                                    (11, Datum::date(0)),
+                                    (12, Datum::timestamp_micros(0)),
+                                    (13, Datum::timestamptz_micros(0)),
+                                    // ns timestamps, uuid, fixed, binary are 
currently not
+                                    // supported in schema::get_arrow_datum

Review Comment:
   Could add support but thought that might be for another PR.



##########
crates/iceberg/src/scan.rs:
##########
@@ -1084,6 +1087,50 @@ pub mod tests {
                                 .record_count(1)
                                 
.partition(Struct::from_iter([Some(Literal::long(100))]))
                                 .key_metadata(None)
+                                // Note:
+                                // The bounds below need to agree with the 
test data written below
+                                // into the Parquet file. If not, tests that 
rely on filter scans
+                                // fail because of wrong bounds.
+                                .lower_bounds(HashMap::from([
+                                    (1, Datum::long(1)),
+                                    (2, Datum::long(2)),
+                                    (3, Datum::long(3)),
+                                    (4, Datum::string("Apache")),
+                                    (5, Datum::double(100)),
+                                    (6, Datum::int(100)),
+                                    (7, Datum::long(100)),
+                                    (8, Datum::bool(false)),
+                                    (9, Datum::float(100.0)),
+                                    // decimal values are not supported by 
schema::get_arrow_datum
+                                    // (10, Datum::decimal(Decimal(123, 2))),
+                                    (11, Datum::date(0)),
+                                    (12, Datum::timestamp_micros(0)),
+                                    (13, Datum::timestamptz_micros(0)),
+                                    // ns timestamps, uuid, fixed, binary are 
currently not
+                                    // supported in schema::get_arrow_datum
+                                ]))
+                                .upper_bounds(HashMap::from([
+                                    (1, Datum::long(1)),
+                                    (2, Datum::long(5)),
+                                    (3, Datum::long(4)),
+                                    (4, Datum::string("Iceberg")),
+                                    (5, Datum::double(200)),
+                                    (6, Datum::int(200)),
+                                    (7, Datum::long(200)),
+                                    (8, Datum::bool(true)),
+                                    (9, Datum::float(200.0)),
+                                    // decimal values are not supported by 
schema::get_arrow_datum
+                                    // (10, Datum::decimal(Decimal(123, 2))),
+                                    (11, Datum::date(0)),
+                                    (12, Datum::timestamp_micros(0)),
+                                    (13, Datum::timestamptz_micros(0)),
+                                    // ns timestamps, uuid, fixed, binary are 
currently not
+                                    // supported in schema::get_arrow_datum
+                                ]))
+                                .column_sizes(HashMap::from([(1, 1u64), (2, 
1u64)]))
+                                .value_counts(HashMap::from([(1, 2u64), (2, 
2u64)]))
+                                .null_value_counts(HashMap::from([(1, 3u64), 
(2, 3u64)]))
+                                .nan_value_counts(HashMap::from([(1, 4u64), 
(2, 4u64)]))

Review Comment:
   This isn't based on the test data but wanted to have that reflected in tests.



##########
crates/iceberg/src/metadata_scan.rs:
##########
@@ -128,6 +141,84 @@ impl<'a> SnapshotsTable<'a> {
     }
 }
 
+/// Entries table containing the manifest file's entries.
+///
+/// The table has one row for each manifest file entry in the current 
snapshot's manifest list file.
+/// For reference, see the Java implementation of [`ManifestEntry`][1].
+///
+/// [1]: 
https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/core/src/main/java/org/apache/iceberg/ManifestEntry.java
+pub struct EntriesTable<'a> {
+    table: &'a Table,
+}
+
+impl<'a> EntriesTable<'a> {
+    /// Get the schema for the manifest entries table.
+    pub fn schema(&self) -> Schema {
+        Schema::new(vec![
+            Field::new("status", DataType::Int32, false),

Review Comment:
   This is populated with 
[ManifestStatus](https://github.com/apache/iceberg-rust/blob/09fa1fa19488ab63c042aa692ac730fe1a7dc987/crates/iceberg/src/spec/manifest.rs#L982-L993)
 enum values.
   
   In Java, the `status` column is `i32` 
[(here)](https://github.com/apache/iceberg/blob/4a432839233f2343a9eae8255532f911f06358ef/core/src/main/java/org/apache/iceberg/ManifestEntry.java#L45)
 but in Python this is 
[u8](https://github.com/apache/iceberg-python/blob/a051584a3684392d2db6556449eb299145d47d15/pyiceberg/table/inspect.py#L121).
   
   My preference would be `u8` but treating the Java implementation as 
authoritative.



##########
crates/iceberg/src/arrow/schema.rs:
##########
@@ -814,6 +814,193 @@ get_parquet_stat_as_datum!(min);
 
 get_parquet_stat_as_datum!(max);
 
+/// Utilities to deal with [arrow_array::builder] types in the Iceberg context.
+pub(crate) mod builder {
+    use arrow_array::builder::*;
+    use arrow_array::cast::AsArray;
+    use arrow_array::types::*;
+    use arrow_array::{ArrayRef, Datum as ArrowDatum};
+    use arrow_schema::{DataType, TimeUnit};
+    use ordered_float::OrderedFloat;
+
+    use crate::spec::{Literal, PrimitiveLiteral};
+    use crate::{Error, ErrorKind};
+
+    /// A helper wrapping [ArrayBuilder] for building arrays without declaring 
the inner type at
+    /// compile-time when types are determined dynamically (e.g. based on some 
column type).
+    /// A [DataType] is given at construction time which is used to later 
downcast the inner array
+    /// and provided values.
+    pub(crate) struct AnyArrayBuilder {
+        data_type: DataType,
+        inner: Box<dyn ArrayBuilder>,
+    }
+
+    impl AnyArrayBuilder {
+        pub(crate) fn new(data_type: &DataType) -> Self {
+            Self {
+                data_type: data_type.clone(),
+                inner: make_builder(data_type, 0),
+            }
+        }
+
+        pub(crate) fn finish(&mut self) -> ArrayRef {
+            self.inner.finish()
+        }
+
+        /// Append an [[arrow_array::Datum]] value.
+        pub(crate) fn append_datum(&mut self, value: &dyn ArrowDatum) -> 
crate::Result<()> {
+            let (array, is_scalar) = value.get();
+            assert!(is_scalar, "Can only append scalar datum");
+
+            match array.data_type() {
+                DataType::Boolean => self
+                    .builder::<BooleanBuilder>()?
+                    .append_value(array.as_boolean().value(0)),
+                DataType::Int32 => self
+                    .builder::<Int32Builder>()?
+                    .append_value(array.as_primitive::<Int32Type>().value(0)),
+                DataType::Int64 => self
+                    .builder::<Int64Builder>()?
+                    .append_value(array.as_primitive::<Int64Type>().value(0)),
+                DataType::Float32 => self
+                    .builder::<Float32Builder>()?
+                    
.append_value(array.as_primitive::<Float32Type>().value(0)),
+                DataType::Float64 => self
+                    .builder::<Float64Builder>()?
+                    
.append_value(array.as_primitive::<Float64Type>().value(0)),
+                DataType::Decimal128(_, _) => self
+                    .builder::<Decimal128Builder>()?
+                    
.append_value(array.as_primitive::<Decimal128Type>().value(0)),
+                DataType::Date32 => self
+                    .builder::<Date32Builder>()?
+                    .append_value(array.as_primitive::<Date32Type>().value(0)),
+                DataType::Time64(TimeUnit::Microsecond) => self
+                    .builder::<Time64MicrosecondBuilder>()?
+                    
.append_value(array.as_primitive::<Time64MicrosecondType>().value(0)),
+                DataType::Timestamp(TimeUnit::Microsecond, _) => self
+                    .builder::<TimestampMicrosecondBuilder>()?
+                    
.append_value(array.as_primitive::<TimestampMicrosecondType>().value(0)),
+                DataType::Timestamp(TimeUnit::Nanosecond, _) => self
+                    .builder::<TimestampNanosecondBuilder>()?
+                    
.append_value(array.as_primitive::<TimestampNanosecondType>().value(0)),

Review Comment:
   I understand it's correct to ignore the timezone here because that's not 
captured in the builder.



##########
crates/iceberg/src/table.rs:
##########
@@ -203,7 +203,7 @@ impl Table {
 
     /// Creates a metadata table which provides table-like APIs for inspecting 
metadata.
     /// See [`MetadataTable`] for more details.
-    pub fn metadata_table(self) -> MetadataTable {
+    pub fn metadata_table(&self) -> MetadataTable<'_> {

Review Comment:
   Addressing this comment 
https://github.com/apache/iceberg-rust/pull/822#discussion_r1899617821. I 
prefer that but don't _need_ to do here.



##########
crates/iceberg/src/metadata_scan.rs:
##########
@@ -255,8 +345,515 @@ impl<'a> ManifestsTable<'a> {
     }
 }
 
+/// Builds the struct describing data files listed in a table manifest.
+///
+/// For reference, see the Java implementation of [`DataFile`][1].
+///
+/// [1]: 
https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/api/src/main/java/org/apache/iceberg/DataFile.java
+struct DataFileStructBuilder<'a> {
+    // Reference to table metadata to retrieve partition specs based on 
partition spec ids
+    table_metadata: &'a TableMetadata,
+    // Below are the field builders of the "data_file" struct
+    content: Int8Builder,
+    file_path: StringBuilder,
+    file_format: StringBuilder,
+    partition: PartitionValuesStructBuilder,
+    // The count types in the Java and PyIceberg implementation i64 however 
the values coming from
+    // the deserialized data files are u64. We agree with the latter to avoid 
casting.
+    record_count: Int64Builder,
+    file_size_in_bytes: Int64Builder,
+    column_sizes: MapBuilder<Int32Builder, Int64Builder>,
+    value_counts: MapBuilder<Int32Builder, Int64Builder>,
+    null_value_counts: MapBuilder<Int32Builder, Int64Builder>,
+    nan_value_counts: MapBuilder<Int32Builder, Int64Builder>,
+    lower_bounds: MapBuilder<Int32Builder, BinaryBuilder>,
+    upper_bounds: MapBuilder<Int32Builder, BinaryBuilder>,
+    key_metadata: BinaryBuilder,
+    split_offsets: ListBuilder<Int64Builder>,
+    equality_ids: ListBuilder<Int32Builder>,
+    sort_order_ids: Int32Builder,
+}
+
+impl<'a> DataFileStructBuilder<'a> {
+    fn new(table_metadata: &'a TableMetadata) -> Self {
+        Self {
+            table_metadata,
+            content: Int8Builder::new(),
+            file_path: StringBuilder::new(),
+            file_format: StringBuilder::new(),
+            partition: PartitionValuesStructBuilder::new(table_metadata),
+            record_count: Int64Builder::new(),
+            file_size_in_bytes: Int64Builder::new(),
+            column_sizes: MapBuilder::new(None, Int32Builder::new(), 
Int64Builder::new()),
+            value_counts: MapBuilder::new(None, Int32Builder::new(), 
Int64Builder::new()),
+            null_value_counts: MapBuilder::new(None, Int32Builder::new(), 
Int64Builder::new()),
+            nan_value_counts: MapBuilder::new(None, Int32Builder::new(), 
Int64Builder::new()),
+            lower_bounds: MapBuilder::new(None, Int32Builder::new(), 
BinaryBuilder::new()),
+            upper_bounds: MapBuilder::new(None, Int32Builder::new(), 
BinaryBuilder::new()),
+            key_metadata: BinaryBuilder::new(),
+            split_offsets: ListBuilder::new(Int64Builder::new()),
+            equality_ids: ListBuilder::new(Int32Builder::new()),
+            sort_order_ids: Int32Builder::new(),
+        }
+    }
+
+    fn fields(table_metadata: &TableMetadata) -> Fields {
+        vec![
+            Field::new("content", DataType::Int8, false),
+            Field::new("file_path", DataType::Utf8, false),
+            Field::new("file_format", DataType::Utf8, false),
+            Field::new(
+                "partition",
+                
DataType::Struct(PartitionValuesStructBuilder::combined_partition_fields(
+                    table_metadata,
+                )),
+                false,
+            ),
+            Field::new("record_count", DataType::Int64, false),
+            Field::new("file_size_in_bytes", DataType::Int64, false),
+            Field::new(
+                "column_sizes",
+                Self::column_id_to_value_type(DataType::Int64),
+                true,
+            ),
+            Field::new(
+                "value_counts",
+                Self::column_id_to_value_type(DataType::Int64),
+                true,
+            ),
+            Field::new(
+                "null_value_counts",
+                Self::column_id_to_value_type(DataType::Int64),
+                true,
+            ),
+            Field::new(
+                "nan_value_counts",
+                Self::column_id_to_value_type(DataType::Int64),
+                true,
+            ),
+            Field::new(
+                "lower_bounds",
+                Self::column_id_to_value_type(DataType::Binary),
+                true,
+            ),
+            Field::new(
+                "upper_bounds",
+                Self::column_id_to_value_type(DataType::Binary),
+                true,
+            ),
+            Field::new("key_metadata", DataType::Binary, true),
+            Field::new(
+                "split_offsets",
+                DataType::new_list(DataType::Int64, true),
+                true,
+            ),
+            Field::new(
+                "equality_ids",
+                DataType::new_list(DataType::Int32, true),
+                true,
+            ),
+            Field::new("sort_order_id", DataType::Int32, true),
+        ]
+        .into()
+    }
+
+    /// Construct a new struct type that maps from column ids (i32) to the 
provided value type.
+    /// Keys, values, and the whole struct are non-nullable.
+    fn column_id_to_value_type(value_type: DataType) -> DataType {
+        DataType::Map(
+            Arc::new(Field::new(
+                "entries",
+                DataType::Struct(
+                    vec![
+                        Field::new("keys", DataType::Int32, false),
+                        Field::new("values", value_type, true),
+                    ]
+                    .into(),
+                ),
+                false,
+            )),
+            false,
+        )
+    }
+
+    fn append(&mut self, manifest_file: &ManifestFile, data_file: &DataFile) 
-> Result<()> {
+        self.content.append_value(data_file.content as i8);
+        self.file_path.append_value(data_file.file_path());
+        self.file_format
+            .append_value(data_file.file_format().to_string().to_uppercase());
+        self.partition.append(
+            self.partition_spec(manifest_file)?.clone().fields(),
+            data_file.partition(),
+        )?;
+        self.record_count
+            .append_value(data_file.record_count() as i64);
+        self.file_size_in_bytes
+            .append_value(data_file.file_size_in_bytes() as i64);

Review Comment:
   The casting is slightly annoying given we're dealing with non-negative 
types. But the Python and Java implementation use i64.



##########
crates/iceberg/src/scan.rs:
##########
@@ -1084,6 +1087,50 @@ pub mod tests {
                                 .record_count(1)
                                 
.partition(Struct::from_iter([Some(Literal::long(100))]))
                                 .key_metadata(None)
+                                // Note:
+                                // The bounds below need to agree with the 
test data written below
+                                // into the Parquet file. If not, tests that 
rely on filter scans
+                                // fail because of wrong bounds.
+                                .lower_bounds(HashMap::from([
+                                    (1, Datum::long(1)),
+                                    (2, Datum::long(2)),
+                                    (3, Datum::long(3)),
+                                    (4, Datum::string("Apache")),
+                                    (5, Datum::double(100)),
+                                    (6, Datum::int(100)),
+                                    (7, Datum::long(100)),
+                                    (8, Datum::bool(false)),
+                                    (9, Datum::float(100.0)),
+                                    // decimal values are not supported by 
schema::get_arrow_datum
+                                    // (10, Datum::decimal(Decimal(123, 2))),
+                                    (11, Datum::date(0)),
+                                    (12, Datum::timestamp_micros(0)),
+                                    (13, Datum::timestamptz_micros(0)),
+                                    // ns timestamps, uuid, fixed, binary are 
currently not
+                                    // supported in schema::get_arrow_datum
+                                ]))
+                                .upper_bounds(HashMap::from([
+                                    (1, Datum::long(1)),
+                                    (2, Datum::long(5)),
+                                    (3, Datum::long(4)),
+                                    (4, Datum::string("Iceberg")),
+                                    (5, Datum::double(200)),
+                                    (6, Datum::int(200)),
+                                    (7, Datum::long(200)),
+                                    (8, Datum::bool(true)),
+                                    (9, Datum::float(200.0)),
+                                    // decimal values are not supported by 
schema::get_arrow_datum
+                                    // (10, Datum::decimal(Decimal(123, 2))),
+                                    (11, Datum::date(0)),
+                                    (12, Datum::timestamp_micros(0)),
+                                    (13, Datum::timestamptz_micros(0)),
+                                    // ns timestamps, uuid, fixed, binary are 
currently not
+                                    // supported in schema::get_arrow_datum
+                                ]))

Review Comment:
   Adding these so we cover those as types in the lower and upper bounds.
   
   I'm trying to limit the changes I'm making because it's already a lot. My 
preference would be to cover all types as partition columns as well.



##########
crates/iceberg/src/metadata_scan.rs:
##########
@@ -255,8 +345,515 @@ impl<'a> ManifestsTable<'a> {
     }
 }
 
+/// Builds the struct describing data files listed in a table manifest.
+///
+/// For reference, see the Java implementation of [`DataFile`][1].
+///
+/// [1]: 
https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/api/src/main/java/org/apache/iceberg/DataFile.java
+struct DataFileStructBuilder<'a> {
+    // Reference to table metadata to retrieve partition specs based on 
partition spec ids
+    table_metadata: &'a TableMetadata,
+    // Below are the field builders of the "data_file" struct
+    content: Int8Builder,
+    file_path: StringBuilder,
+    file_format: StringBuilder,
+    partition: PartitionValuesStructBuilder,
+    // The count types in the Java and PyIceberg implementation i64 however 
the values coming from
+    // the deserialized data files are u64. We agree with the latter to avoid 
casting.
+    record_count: Int64Builder,
+    file_size_in_bytes: Int64Builder,
+    column_sizes: MapBuilder<Int32Builder, Int64Builder>,
+    value_counts: MapBuilder<Int32Builder, Int64Builder>,
+    null_value_counts: MapBuilder<Int32Builder, Int64Builder>,
+    nan_value_counts: MapBuilder<Int32Builder, Int64Builder>,
+    lower_bounds: MapBuilder<Int32Builder, BinaryBuilder>,
+    upper_bounds: MapBuilder<Int32Builder, BinaryBuilder>,
+    key_metadata: BinaryBuilder,
+    split_offsets: ListBuilder<Int64Builder>,
+    equality_ids: ListBuilder<Int32Builder>,
+    sort_order_ids: Int32Builder,
+}
+
+impl<'a> DataFileStructBuilder<'a> {
+    fn new(table_metadata: &'a TableMetadata) -> Self {
+        Self {
+            table_metadata,
+            content: Int8Builder::new(),
+            file_path: StringBuilder::new(),
+            file_format: StringBuilder::new(),
+            partition: PartitionValuesStructBuilder::new(table_metadata),
+            record_count: Int64Builder::new(),

Review Comment:
   The `manifests` table merged in #861 prefers using `PrimitiveBuilder::new()` 
which works because `Int64Builder` is just `PrimitiveBuilder<Int64Type>`.
   
   I prefer saying `Int64Builder` here to be explicit about the type but happy 
to change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to